You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2020/09/02 17:08:28 UTC
[incubator-pinot] 02/03: severity alerter & unit tests
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch severity-alerter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 45d34f050ac628588634e17102c598b7588fd723
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Tue Sep 1 13:54:04 2020 -0700
severity alerter & unit tests
---
.../anomaly/monitor/MonitorTaskRunner.java | 10 +
.../pinot/thirdeye/constant/AnomalySeverity.java | 35 ++++
.../datalayer/dto/MergedAnomalyResultDTO.java | 10 +
.../datalayer/pojo/MergedAnomalyResultBean.java | 10 +
.../detection/DetectionPipelineTaskRunner.java | 30 +++
.../thirdeye/detection/DetectionResource.java | 22 +-
.../detection/alert/DetectionAlertJob.java | 14 +-
.../alert/filter/AnomalySeverityAlertFilter.java | 163 +++++++++++++++
.../translator/SubscriptionConfigTranslator.java | 3 +-
.../detection/alert/filter/AlertFilterUtils.java | 9 +-
.../filter/AnomalySeverityAlertFilterTest.java | 232 +++++++++++++++++++++
11 files changed, 526 insertions(+), 12 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
index 763e75b..33fafee 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
@@ -281,6 +281,16 @@ public class MonitorTaskRunner implements TaskRunner {
} catch (Exception e) {
LOG.error("Exception when deleting old evaluations.", e);
}
+
+ // Delete old anomaly subscription notifications.
+ try {
+ int deletedRecords = DAO_REGISTRY.getAnomalySubscriptionGroupNotificationManager()
+ .deleteRecordsOlderThanDays(monitorTaskInfo.getDefaultRetentionDays());
+ LOG.info("Deleted {} anomaly subscription notifications that are older than {} days.", deletedRecords,
+ monitorTaskInfo.getDefaultRetentionDays());
+ } catch (Exception e) {
+ LOG.error("Exception when deleting old anomaly subscription notifications.", e);
+ }
}
private Map<Long, JobDTO> findScheduledJobsWithinDays(int days) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalySeverity.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalySeverity.java
new file mode 100644
index 0000000..48e542e
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalySeverity.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.pinot.thirdeye.constant;
+
+public enum AnomalySeverity {
+ LOW("low"), MODERATE("moderate"), HIGH("high"), CRITICAL("critical"), DEFAULT("default");
+
+ String userReadableName;
+
+ AnomalySeverity(String userReadableName) {
+ this.userReadableName = userReadableName;
+ }
+
+ public String getUserReadableName() {
+ return this.userReadableName;
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java
index 415a252..620b0de 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java
@@ -44,6 +44,8 @@ public class MergedAnomalyResultDTO extends MergedAnomalyResultBean implements A
private Set<MergedAnomalyResultDTO> children = new HashSet<>();
+ private boolean renotify;
+
public MergedAnomalyResultDTO() {
setCreatedTime(System.currentTimeMillis());
}
@@ -94,6 +96,14 @@ public class MergedAnomalyResultDTO extends MergedAnomalyResultBean implements A
return function;
}
+ public boolean shouldRenotify() {
+ return renotify;
+ }
+
+ public void setRenotify(boolean renotify) {
+ this.renotify = renotify;
+ }
+
@Deprecated
public void setFunction(AnomalyFunctionDTO function) {
this.function = function;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java
index 834356d..e29de67 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.commons.lang3.ObjectUtils;
+import org.apache.pinot.thirdeye.constant.AnomalySeverity;
public class MergedAnomalyResultBean extends AbstractBean implements Comparable<MergedAnomalyResultBean>, Serializable {
@@ -60,6 +61,7 @@ public class MergedAnomalyResultBean extends AbstractBean implements Comparable<
private Set<Long> childIds; // ids of the anomalies this anomaly merged from
private boolean isChild;
private AnomalyType type;
+ private AnomalySeverity severity = AnomalySeverity.DEFAULT;
public Set<Long> getChildIds() {
return childIds;
@@ -264,6 +266,14 @@ public class MergedAnomalyResultBean extends AbstractBean implements Comparable<
this.type = type;
}
+ public AnomalySeverity getSeverity() {
+ return severity;
+ }
+
+ public void setSeverity(AnomalySeverity severity) {
+ this.severity = severity;
+ }
+
@Override
public int hashCode() {
return Objects.hash(getId(), startTime, endTime, collection, metric, dimensions, score, impactToGlobal, avgBaselineVal, avgCurrentVal, anomalyResultSource, metricUrn, detectionConfigId, childIds, isChild);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java
index 86126c7..183be6f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java
@@ -27,6 +27,7 @@ import org.apache.pinot.thirdeye.anomaly.task.TaskInfo;
import org.apache.pinot.thirdeye.anomaly.task.TaskResult;
import org.apache.pinot.thirdeye.anomaly.task.TaskRunner;
import org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
+import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager;
import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
@@ -34,9 +35,11 @@ import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.EvaluationDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
import org.apache.pinot.thirdeye.datasource.DAORegistry;
import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
@@ -57,6 +60,7 @@ public class DetectionPipelineTaskRunner implements TaskRunner {
private final MergedAnomalyResultManager anomalyDAO;
private final EvaluationManager evaluationDAO;
private final TaskManager taskDAO;
+ private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationDAO;
private final DetectionPipelineLoader loader;
private final DataProvider provider;
private final ModelMaintenanceFlow maintenanceFlow;
@@ -76,6 +80,8 @@ public class DetectionPipelineTaskRunner implements TaskRunner {
this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+ this.anomalySubscriptionGroupNotificationDAO =
+ DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
MetricConfigManager metricDAO = DAORegistry.getInstance().getMetricConfigDAO();
DatasetConfigManager datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
EventManager eventDAO = DAORegistry.getInstance().getEventDAO();
@@ -112,6 +118,8 @@ public class DetectionPipelineTaskRunner implements TaskRunner {
this.loader = loader;
this.provider = provider;
this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+ this.anomalySubscriptionGroupNotificationDAO =
+ DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
this.maintenanceFlow = new ModelRetuneFlow(this.provider, DetectionRegistry.getInstance());
}
@@ -167,6 +175,28 @@ public class DetectionPipelineTaskRunner implements TaskRunner {
}
this.detectionDAO.update(config);
+ // re-notify the anomalies if any
+ for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) {
+ // if an anomaly should be re-notified, update the notification lookup table in the database
+ if (anomaly.shouldRenotify()) {
+ List<AnomalySubscriptionGroupNotificationDTO> subscriptionGroupNotificationDTOs =
+ this.anomalySubscriptionGroupNotificationDAO.findByPredicate(Predicate.EQ("anomalyId", anomaly.getId()));
+ AnomalySubscriptionGroupNotificationDTO anomalyNotificationDTO;
+ if (subscriptionGroupNotificationDTOs.isEmpty()) {
+ // create a new record if it is not existed yet.
+ anomalyNotificationDTO = new AnomalySubscriptionGroupNotificationDTO();
+ new AnomalySubscriptionGroupNotificationDTO();
+ anomalyNotificationDTO.setAnomalyId(anomaly.getId());
+ anomalyNotificationDTO.setDetectionConfigId(anomaly.getDetectionConfigId());
+ } else {
+ // update the existing record if the anomaly needs to be re-notified
+ anomalyNotificationDTO = subscriptionGroupNotificationDTOs.get(0);
+ anomalyNotificationDTO.setNotifiedSubscriptionGroupIds(Collections.emptyList());
+ }
+ this.anomalySubscriptionGroupNotificationDAO.save(anomalyNotificationDTO);
+ }
+ }
+
ThirdeyeMetricsUtil.detectionTaskSuccessCounter.inc();
LOG.info("End detection for config {} between {} and {}. Detected {} anomalies.", config.getId(), info.start,
info.end, result.getAnomalies());
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
index c0fdb51..f94f700 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
@@ -123,6 +123,7 @@ public class DetectionResource {
private final DetectionAlertConfigFormatter subscriptionConfigFormatter;
private final AggregationLoader aggregationLoader;
private final DetectionConfigurationResource detectionConfigurationResource;
+ private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationManager;
@Inject
public DetectionResource(
@@ -137,6 +138,7 @@ public class DetectionResource {
this.detectionAlertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager();
this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager();
this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+ this.anomalySubscriptionGroupNotificationManager = DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
TimeSeriesLoader timeseriesLoader =
new DefaultTimeSeriesLoader(metricDAO, datasetDAO, ThirdEyeCacheRegistry.getInstance().getQueryCache(), ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache());
@@ -573,14 +575,18 @@ public class DetectionResource {
return Response.ok(health).build();
}
- @GET
- @Path(value = "/alert")
- public Response alert() {
- AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationManager = DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
- AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotificationDTO = new AnomalySubscriptionGroupNotificationDTO();
- anomalySubscriptionGroupNotificationDTO.setAnomalyId(1L);
- anomalySubscriptionGroupNotificationDTO.setDetectionConfigId(2L);
- anomalySubscriptionGroupNotificationManager.save(anomalySubscriptionGroupNotificationDTO);
+ @POST
+ @Path(value = "/re-notify")
+ @ApiOperation("Resend the notification for the anomalies to the subscribed notification groups, if the subscription group supports re-notify")
+ public Response alert(@QueryParam("id") List<Long> anomalyIds) {
+ List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByIds(anomalyIds);
+ for (MergedAnomalyResultDTO anomaly : anomalies) {
+ AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotificationDTO =
+ new AnomalySubscriptionGroupNotificationDTO();
+ anomalySubscriptionGroupNotificationDTO.setAnomalyId(anomaly.getId());
+ anomalySubscriptionGroupNotificationDTO.setDetectionConfigId(anomaly.getDetectionConfigId());
+ anomalySubscriptionGroupNotificationManager.save(anomalySubscriptionGroupNotificationDTO);
+ }
return Response.ok().build();
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
index 70b318c..4026869 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
@@ -24,9 +24,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
import java.util.Map;
import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
import org.apache.pinot.thirdeye.datalayer.util.Predicate;
@@ -48,12 +50,14 @@ public class DetectionAlertJob implements Job {
private DetectionAlertConfigManager alertConfigDAO;
private TaskManager taskDAO;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private MergedAnomalyResultManager anomalyDAO;
+ private final MergedAnomalyResultManager anomalyDAO;
+ private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationDAO;
public DetectionAlertJob() {
this.alertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager();
this.taskDAO = DAORegistry.getInstance().getTaskDAO();
this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+ this.anomalySubscriptionGroupNotificationDAO = DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
}
@Override
@@ -110,6 +114,8 @@ public class DetectionAlertJob implements Job {
* For example, if previous anomaly is from t1 to t2 generated at t3, then the timestamp in vectorLock is t3.
* If there is a new anomaly from t2 to t4 generated at t5, then we can still get this anomaly as t5 > t3.
*
+ * Also, check if there is any anomaly that needs re-notifying
+ *
* @param configDTO The Subscription Configuration.
* @return true if it needs notification task. false otherwise.
*/
@@ -123,6 +129,10 @@ public class DetectionAlertJob implements Job {
return true;
}
}
- return false;
+ // in addition to checking the watermarks, check if any anomalies need to be re-notified by querying the anomaly subscription group notification table
+ List<AnomalySubscriptionGroupNotificationDTO> anomalySubscriptionGroupNotifications =
+ this.anomalySubscriptionGroupNotificationDAO.findByPredicate(
+ Predicate.IN("detectionConfigId", vectorLocks.keySet().toArray()));
+ return !anomalySubscriptionGroupNotifications.isEmpty();
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilter.java
new file mode 100644
index 0000000..1042ac7
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.detection.alert.filter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.constant.AnomalySeverity;
+import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.util.Predicate;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterNotification;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
+import org.apache.pinot.thirdeye.detection.alert.StatefulDetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.annotation.AlertFilter;
+
+
+/**
+ * The detection alert filter that can send notifications through multiple channels
+ * to a set of unconditional and another set of conditional recipients, based on the
+ * value of a specified anomaly severities.
+ *
+ * You can configure anomaly severities along with a variety of alerting
+ * channels and reference links.
+ *
+ * This alert pipeline have the capability of re-notify anomalies if the anomaly's severity is
+ * changed after it's created.
+ *
+ * <pre>
+ * severityRecipients:
+ * - severity:
+ * - LOW
+ * notify:
+ * jiraScheme:
+ * project: PROJECT
+ * assignee: oncall
+ * emailScheme:
+ * recipients:
+ * to:
+ * - "oncall@comany.com"
+ * - severity:
+ * - HIGH
+ * - CRITICAL
+ * notify:
+ * jiraScheme:
+ * project: PROJECT
+ * assignee: manager
+ * </pre>
+ */
+@AlertFilter(type = "SEVERITY_ALERTER_PIPELINE")
+public class AnomalySeverityAlertFilter extends StatefulDetectionAlertFilter {
+ public static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
+ public static final String PROP_SEVERITY = "severity";
+ public static final String PROP_NOTIFY = "notify";
+ public static final String PROP_REF_LINKS = "referenceLinks";
+ public static final String PROP_SEVERITY_RECIPIENTS = "severityRecipients";
+
+ private final List<Map<String, Object>> severityRecipients;
+ private final List<Long> detectionConfigIds;
+
+ private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationDAO;
+
+
+ public AnomalySeverityAlertFilter(DataProvider provider, DetectionAlertConfigDTO config, long endTime) {
+ super(provider, config, endTime);
+ this.severityRecipients = ConfigUtils.getList(this.config.getProperties().get(PROP_SEVERITY_RECIPIENTS));
+ this.detectionConfigIds = ConfigUtils.getLongs(this.config.getProperties().get(PROP_DETECTION_CONFIG_IDS));
+ this.anomalySubscriptionGroupNotificationDAO =
+ DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
+ }
+
+ @Override
+ public DetectionAlertFilterResult run() {
+ DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+
+ // retrieve the anomalies based on vector clocks
+ Set<MergedAnomalyResultDTO> anomalies = this.filter(this.makeVectorClocks(this.detectionConfigIds));
+ // find the anomalies that needs re-notifying.
+ anomalies.addAll(this.retrieveRenotifyAnomalies(this.detectionConfigIds));
+ // Prepare mapping from severity-recipients to anomalies
+ for (Map<String, Object> severityRecipient : this.severityRecipients) {
+ List<AnomalySeverity> severities = ConfigUtils.getList(severityRecipient.get(PROP_SEVERITY))
+ .stream()
+ .map(s -> AnomalySeverity.valueOf((String) s))
+ .collect(Collectors.toList());
+ Set<MergedAnomalyResultDTO> notifyAnomalies = new HashSet<>();
+ for (MergedAnomalyResultDTO anomaly : anomalies) {
+ if (severities.contains(anomaly.getSeverity())) {
+ notifyAnomalies.add(anomaly);
+ }
+ }
+
+ if (!notifyAnomalies.isEmpty()) {
+ DetectionAlertConfigDTO subsConfig = SubscriptionUtils.makeChildSubscriptionConfig(config,
+ ConfigUtils.getMap(severityRecipient.get(PROP_NOTIFY)),
+ ConfigUtils.getMap(severityRecipient.get(PROP_REF_LINKS)));
+ result.addMapping(new DetectionAlertFilterNotification(subsConfig), notifyAnomalies);
+ }
+ }
+
+ // Notify the remaining anomalies to default recipients
+ Set<MergedAnomalyResultDTO> allNotifiedAnomalies = new HashSet<>(result.getAllAnomalies());
+ Set<MergedAnomalyResultDTO> defaultAnomalies = new HashSet<>();
+ for (MergedAnomalyResultDTO anomaly : anomalies) {
+ if (!allNotifiedAnomalies.contains(anomaly)) {
+ defaultAnomalies.add(anomaly);
+ }
+ }
+ if (!defaultAnomalies.isEmpty()) {
+ result.addMapping(new DetectionAlertFilterNotification(config), defaultAnomalies);
+ }
+
+ return result;
+ }
+
+
+ protected Collection<MergedAnomalyResultDTO> retrieveRenotifyAnomalies(Collection<Long> detectionConfigIds) {
+ // find if any notification is needed
+ List<AnomalySubscriptionGroupNotificationDTO> anomalySubscriptionGroupNotificationDTOs =
+ this.anomalySubscriptionGroupNotificationDAO.findByPredicate(
+ Predicate.IN("detectionConfigId", detectionConfigIds.toArray()));
+
+ List<Long> anomalyIds = new ArrayList<>();
+ for (AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotification : anomalySubscriptionGroupNotificationDTOs) {
+ // notify the anomalies if this subscription group have not sent out this anomaly yet
+ if (!anomalySubscriptionGroupNotification.getNotifiedSubscriptionGroupIds().contains(this.config.getId())) {
+ anomalyIds.add(anomalySubscriptionGroupNotification.getAnomalyId());
+ // add this subscription group to the notification record and update
+ anomalySubscriptionGroupNotification.getNotifiedSubscriptionGroupIds().add(this.config.getId());
+ this.anomalySubscriptionGroupNotificationDAO.save(anomalySubscriptionGroupNotification);
+ }
+ }
+ return anomalyIds.isEmpty() ? Collections.emptyList()
+ : DAORegistry.getInstance().getMergedAnomalyResultDAO().findByIds(anomalyIds);
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java
index 9d4044f..a3fec01 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java
@@ -68,10 +68,11 @@ public class SubscriptionConfigTranslator extends ConfigTranslator<DetectionAler
private static final String PROP_DIMENSION = "dimension";
private static final String PROP_DIMENSION_RECIPIENTS = "dimensionRecipients";
+ private static final String PROP_SEVERITY_RECIPIENTS = "severityRecipients";
private static final DetectionAlertRegistry DETECTION_ALERT_REGISTRY = DetectionAlertRegistry.getInstance();
private static final Set<String> PROPERTY_KEYS = new HashSet<>(
- Arrays.asList(PROP_RECIPIENTS, PROP_DIMENSION, PROP_DIMENSION_RECIPIENTS));
+ Arrays.asList(PROP_RECIPIENTS, PROP_DIMENSION, PROP_DIMENSION_RECIPIENTS, PROP_SEVERITY_RECIPIENTS));
private final DetectionConfigManager detectionConfigDAO;
public SubscriptionConfigTranslator(DetectionConfigManager detectionConfigDAO, String yamlConfig) {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java
index a1091cf..72f64db 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.pinot.thirdeye.anomaly.AnomalyType;
import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
+import org.apache.pinot.thirdeye.constant.AnomalySeverity;
import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
@@ -89,9 +90,14 @@ public class AlertFilterUtils {
return new DetectionAlertFilterNotification(subsConfig);
}
-
static MergedAnomalyResultDTO makeAnomaly(Long configId, long baseTime, long start, long end,
Map<String, String> dimensions, AnomalyFeedbackDTO feedback) {
+ return makeAnomaly(configId, baseTime, start, end, dimensions, feedback, AnomalySeverity.DEFAULT);
+ }
+
+
+ static MergedAnomalyResultDTO makeAnomaly(Long configId, long baseTime, long start, long end,
+ Map<String, String> dimensions, AnomalyFeedbackDTO feedback, AnomalySeverity severity) {
MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(configId, baseTime + start, baseTime + end);
anomaly.setType(AnomalyType.DEVIATION);
anomaly.setChildIds(Collections.emptySet());
@@ -108,6 +114,7 @@ public class AlertFilterUtils {
anomaly.setCreatedBy("no-auth-user");
anomaly.setUpdatedBy("no-auth-user");
+ anomaly.setSeverity(severity);
anomaly.setId(DAORegistry.getInstance().getMergedAnomalyResultDAO().save(anomaly));
if (feedback != null) {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilterTest.java
new file mode 100644
index 0000000..38f9189
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilterTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.pinot.thirdeye.detection.alert.filter;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.thirdeye.constant.AnomalySeverity;
+import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager;
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
+import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.detection.MockDataProvider;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterNotification;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.thirdeye.detection.alert.filter.AlertFilterUtils.*;
+
+
+public class AnomalySeverityAlertFilterTest {
+ private static final String PROP_RECIPIENTS = "recipients";
+ private static final String PROP_EMAIL_SCHEME = "emailScheme";
+ private static final String PROP_TO = "to";
+ private static final String PROP_CC = "cc";
+ private static final String PROP_BCC = "bcc";
+ private static final Set<String> PROP_TO_FOR_VALUE =
+ new HashSet<>(Arrays.asList("myTest@example.com", "myTest@example.org"));
+ private static final Set<String> PROP_TO_FOR_ANOTHER_VALUE =
+ new HashSet<>(Arrays.asList("myTest@example.net", "myTest@example.com"));
+ private static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds";
+ private static final String PROP_SEVERITY_TO = "severityRecipients";
+ private static final List<Object> severityProperty = new ArrayList<>();
+
+ private DetectionAlertFilter alertFilter;
+
+ private DAOTestBase testDAOProvider;
+ private DetectionAlertConfigDTO alertConfig;
+ private List<MergedAnomalyResultDTO> detectionAnomalies;
+ private long baseTime;
+ private List<Long> detectionConfigIds;
+ private MergedAnomalyResultDTO renotifyAnomaly;
+ private final MockDataProvider provider = new MockDataProvider();
+ private final Map<String, Object> notify1 = new HashMap<>();
+ private final Map<String, Object> notify2 = new HashMap<>();
+ private final Map<String, Object> defaultScheme = new HashMap<>();
+
+ @BeforeMethod
+ public void beforeMethod() throws InterruptedException {
+ testDAOProvider = DAOTestBase.getInstance();
+
+ DetectionConfigDTO detectionConfig1 = new DetectionConfigDTO();
+ detectionConfig1.setName("test detection 1");
+ detectionConfig1.setActive(true);
+ long detectionConfigId1 = DAORegistry.getInstance().getDetectionConfigManager().save(detectionConfig1);
+
+ DetectionConfigDTO detectionConfig2 = new DetectionConfigDTO();
+ detectionConfig2.setName("test detection 2");
+ detectionConfig2.setActive(true);
+ long detectionConfigId2 = DAORegistry.getInstance().getDetectionConfigManager().save(detectionConfig2);
+
+ detectionConfigIds = Arrays.asList(detectionConfigId1, detectionConfigId2);
+
+ // Anomaly notification is tracked through create time. Start and end time doesn't matter here.
+ this.detectionAnomalies = new ArrayList<>();
+ renotifyAnomaly =
+ makeAnomaly(detectionConfigId1, System.currentTimeMillis(), 0, 50, Collections.singletonMap("key", "value"),
+ null, AnomalySeverity.LOW);
+ Thread.sleep(100);
+ this.baseTime = System.currentTimeMillis();
+ Thread.sleep(100);
+ this.detectionAnomalies.add(
+ makeAnomaly(detectionConfigId1, this.baseTime, 0, 100, Collections.singletonMap("key", "value"), null,
+ AnomalySeverity.LOW));
+ Thread.sleep(10);
+ this.detectionAnomalies.add(
+ makeAnomaly(detectionConfigId1, this.baseTime, 0, 110, Collections.singletonMap("key", "anotherValue"), null,
+ AnomalySeverity.MODERATE));
+ Thread.sleep(20);
+ this.detectionAnomalies.add(
+ makeAnomaly(detectionConfigId1, this.baseTime, 0, 120, Collections.singletonMap("key", "unknownValue"), null,
+ AnomalySeverity.HIGH));
+ Thread.sleep(30);
+ this.detectionAnomalies.add(
+ makeAnomaly(detectionConfigId2, this.baseTime, 110, 150, Collections.singletonMap("unknownKey", "value"),
+ null));
+ Thread.sleep(10);
+ this.detectionAnomalies.add(
+ makeAnomaly(detectionConfigId2, this.baseTime, 120, 160, Collections.singletonMap("key", "value"), null));
+ Thread.sleep(40);
+ this.detectionAnomalies.add(
+ makeAnomaly(detectionConfigId1, this.baseTime, 150, 200, Collections.<String, String>emptyMap(), null));
+ Thread.sleep(200);
+ this.detectionAnomalies.add(
+ makeAnomaly(detectionConfigId2, this.baseTime, 300, 400, Collections.singletonMap("key", "value"), null));
+ Thread.sleep(100);
+
+ this.alertConfig = createDetectionAlertConfig();
+ }
+
+ private DetectionAlertConfigDTO createDetectionAlertConfig() {
+ DetectionAlertConfigDTO alertConfig = new DetectionAlertConfigDTO();
+
+ notify1.put("severity", Arrays.asList("LOW", "MODERATE"));
+ notify1.put("notify", ImmutableMap.of("emailScheme", ImmutableMap.of("recipients", PROP_TO_FOR_VALUE)));
+ notify2.put("severity", Collections.singleton("HIGH"));
+ notify2.put("notify", ImmutableMap.of("emailScheme", ImmutableMap.of("recipients", PROP_TO_FOR_ANOTHER_VALUE)));
+ severityProperty.add(notify1);
+ severityProperty.add(notify2);
+
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(PROP_DETECTION_CONFIG_IDS, detectionConfigIds);
+ properties.put(PROP_SEVERITY_TO, severityProperty);
+ alertConfig.setProperties(properties);
+
+ Map<String, Object> emailScheme = new HashMap<>();
+ Map<String, Set<String>> recipients = new HashMap<>();
+ recipients.put(PROP_TO, AlertFilterUtils.PROP_TO_VALUE);
+ recipients.put(PROP_CC, AlertFilterUtils.PROP_CC_VALUE);
+ recipients.put(PROP_BCC, AlertFilterUtils.PROP_BCC_VALUE);
+ emailScheme.put(PROP_RECIPIENTS, recipients);
+ defaultScheme.put(PROP_EMAIL_SCHEME, emailScheme);
+ alertConfig.setAlertSchemes(defaultScheme);
+
+ Map<Long, Long> vectorClocks = new HashMap<>();
+ vectorClocks.put(detectionConfigIds.get(0), this.baseTime);
+ vectorClocks.put(detectionConfigIds.get(1), this.baseTime);
+ alertConfig.setVectorClocks(vectorClocks);
+
+ return alertConfig;
+ }
+
+ @Test
+ public void testAlertFilterRecipients() throws Exception {
+ this.alertFilter = new AnomalySeverityAlertFilter(provider, alertConfig, this.baseTime + 350L);
+
+ DetectionAlertFilterResult result = this.alertFilter.run();
+ Assert.assertEquals(result.getResult().size(), 3);
+
+ int verifiedResult = 0;
+ for (Map.Entry<DetectionAlertFilterNotification, Set<MergedAnomalyResultDTO>> entry : result.getResult()
+ .entrySet()) {
+ if (entry.getValue().equals(makeSet(0, 1))) {
+ Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify1.get("notify"));
+ verifiedResult++;
+ } else if (entry.getValue().equals(makeSet(2))) {
+ Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify2.get("notify"));
+ verifiedResult++;
+ } else if (entry.getValue().equals(makeSet(3, 4, 5))) {
+ Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), defaultScheme);
+ verifiedResult++;
+ }
+ }
+ Assert.assertEquals(verifiedResult, 3);
+ }
+
+ @Test
+ public void testRenotifyAnomaly() throws Exception {
+ AnomalySubscriptionGroupNotificationManager renotificationManager =
+ DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager();
+ AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotification =
+ new AnomalySubscriptionGroupNotificationDTO();
+ anomalySubscriptionGroupNotification.setAnomalyId(renotifyAnomaly.getId());
+ anomalySubscriptionGroupNotification.setDetectionConfigId(renotifyAnomaly.getDetectionConfigId());
+ renotificationManager.save(anomalySubscriptionGroupNotification);
+
+ this.alertFilter = new AnomalySeverityAlertFilter(provider, alertConfig, this.baseTime + 350L);
+
+ DetectionAlertFilterResult result = this.alertFilter.run();
+ Assert.assertEquals(result.getResult().size(), 3);
+
+ int verifiedResult = 0;
+ for (Map.Entry<DetectionAlertFilterNotification, Set<MergedAnomalyResultDTO>> entry : result.getResult()
+ .entrySet()) {
+ if (entry.getValue().equals(makeSet(renotifyAnomaly, 0, 1))) {
+ Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify1.get("notify"));
+ verifiedResult++;
+ } else if (entry.getValue().equals(makeSet(2))) {
+ Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify2.get("notify"));
+ verifiedResult++;
+ } else if (entry.getValue().equals(makeSet(3, 4, 5))) {
+ Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), defaultScheme);
+ verifiedResult++;
+ }
+ }
+ Assert.assertEquals(verifiedResult, 3);
+ }
+
+ private Set<MergedAnomalyResultDTO> makeSet(MergedAnomalyResultDTO anomaly, int... anomalyIndices) {
+ Set<MergedAnomalyResultDTO> set = makeSet(anomalyIndices);
+ set.add(anomaly);
+ return set;
+ }
+
+ private Set<MergedAnomalyResultDTO> makeSet(int... anomalyIndices) {
+ Set<MergedAnomalyResultDTO> output = new HashSet<>();
+ for (int anomalyIndex : anomalyIndices) {
+ output.add(this.detectionAnomalies.get(anomalyIndex));
+ }
+ return output;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org