You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pinot.apache.org by GitBox <gi...@apache.org> on 2018/11/13 23:12:02 UTC

[GitHub] apucher closed pull request #3463: [TE] Implement Threshold based Time Window Suppressor

apucher closed pull request #3463: [TE] Implement Threshold based Time Window Suppressor
URL: https://github.com/apache/incubator-pinot/pull/3463
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
index 3a51c950d5..61d118f2ca 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskFactory.java
@@ -85,8 +85,7 @@ public DetectionAlertFilter loadAlertFilter(DetectionAlertConfigDTO alertConfig,
     return detectionAlertSchemeSet;
   }
 
-  public Set<DetectionAlertSuppressor> loadAlertSuppressors(DetectionAlertConfigDTO alertConfig,
-      ThirdEyeAnomalyConfiguration thirdeyeConfig) throws Exception {
+  public Set<DetectionAlertSuppressor> loadAlertSuppressors(DetectionAlertConfigDTO alertConfig) throws Exception {
     Preconditions.checkNotNull(alertConfig);
     Set<DetectionAlertSuppressor> detectionAlertSuppressors = new HashSet<>();
     Map<String, Map<String, Object>> alertSuppressors = alertConfig.getAlertSuppressors();
@@ -99,8 +98,8 @@ public DetectionAlertFilter loadAlertFilter(DetectionAlertConfigDTO alertConfig,
       Preconditions.checkNotNull(alertSuppressors.get(alertSuppressor));
       Preconditions.checkNotNull(alertSuppressors.get(alertSuppressor).get("className"));
       Constructor<?> constructor = Class.forName(alertSuppressors.get(alertSuppressor).get("className").toString().trim())
-          .getConstructor(DetectionAlertConfigDTO.class, ThirdEyeAnomalyConfiguration.class);
-      detectionAlertSuppressors.add((DetectionAlertSuppressor) constructor.newInstance(alertConfig, thirdeyeConfig));
+          .getConstructor(DetectionAlertConfigDTO.class);
+      detectionAlertSuppressors.add((DetectionAlertSuppressor) constructor.newInstance(alertConfig));
     }
 
     return detectionAlertSuppressors;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index 4bc9b3a0bf..669b442334 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -101,8 +101,7 @@ private void updateAlertConfigWatermarks(DetectionAlertFilterResult result, Dete
       DetectionAlertFilterResult result = alertFilter.run();
 
       // Suppress alerts if any and get the filtered anomalies to be notified
-      Set<DetectionAlertSuppressor> alertSuppressors =
-          detAlertTaskFactory.loadAlertSuppressors(alertConfig, taskContext.getThirdEyeAnomalyConfiguration());
+      Set<DetectionAlertSuppressor> alertSuppressors = detAlertTaskFactory.loadAlertSuppressors(alertConfig);
       for (DetectionAlertSuppressor alertSuppressor : alertSuppressors) {
         result = alertSuppressor.run(result);
       }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
index 72a67b040a..6ff87b229a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertSuppressor.java
@@ -1,5 +1,6 @@
 package com.linkedin.thirdeye.detection.alert.suppress;
 
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
 
 
@@ -11,5 +12,11 @@
  */
 public abstract class DetectionAlertSuppressor {
 
+  protected final DetectionAlertConfigDTO config;
+
+  public DetectionAlertSuppressor(DetectionAlertConfigDTO config) {
+    this.config = config;
+  }
+
   public abstract DetectionAlertFilterResult run(DetectionAlertFilterResult result) throws Exception;
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
new file mode 100644
index 0000000000..9fd5ded544
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionAlertTimeWindowSuppressor.java
@@ -0,0 +1,127 @@
+package com.linkedin.thirdeye.detection.alert.suppress;
+
+import com.google.common.base.Preconditions;
+import com.linkedin.thirdeye.anomalydetection.context.AnomalyFeedback;
+import com.linkedin.thirdeye.constant.AnomalyFeedbackType;
+import com.linkedin.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import com.linkedin.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.datasource.DAORegistry;
+import com.linkedin.thirdeye.detection.ConfigUtils;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Suppress alerts from anomalies generated during a specific time period.
+ *
+ * This class enables 2 ways of suppressing alerts
+ * 1. Suppress all the alerts generated during the time window. No alerts will be sent.
+ *    ({@link #WINDOW_START_TIME_KEY} and {@link #WINDOW_END_TIME_KEY})
+ * 2. Suppress alerts in the time window based on some thresholds.
+ *    ({@link #EXPECTED_CHANGE_KEY} and {@link #ACCEPTABLE_DEVIATION_KEY})
+ */
+public class DetectionAlertTimeWindowSuppressor extends DetectionAlertSuppressor {
+  private static final Logger LOG = LoggerFactory.getLogger(DetectionAlertTimeWindowSuppressor.class);
+
+  static final String TIME_WINDOW_SUPPRESSOR_KEY = "timeWindowSuppressor";
+  static final String TIME_WINDOWS_KEY = "timeWindows";
+
+  static final String WINDOW_START_TIME_KEY = "windowStartTime";
+  static final String WINDOW_END_TIME_KEY = "windowEndTime";
+  static final String IS_THRESHOLD_KEY = "isThresholdApplied";
+
+  // The expected rise or fall of a metric during the holiday or suppression period (ex: -0.5 for 50% drop)
+  static final String EXPECTED_CHANGE_KEY = "expectedChange";
+
+  // The acceptable deviation from the dropped/risen value during the suppression period (ex: 0.1 for +/- 10%)
+  static final String ACCEPTABLE_DEVIATION_KEY = "acceptableDeviation";
+
+  public DetectionAlertTimeWindowSuppressor(DetectionAlertConfigDTO config) {
+    super(config);
+  }
+
+  private boolean isAnomalySuppressedByThreshold(double anomalyWeight, Map<String, Object> suppressWindowProps) {
+    double expectedDropOrSpike = (Double) suppressWindowProps.get(EXPECTED_CHANGE_KEY);
+    double acceptableDeviation = (Double) suppressWindowProps.get(ACCEPTABLE_DEVIATION_KEY);
+    if (anomalyWeight <= (expectedDropOrSpike + acceptableDeviation)
+        && anomalyWeight >= (expectedDropOrSpike - acceptableDeviation)) {
+      LOG.info("Anomaly id {} falls within the specified thresholds (anomalyWeight = {}, expectedDropOrSpike = {},"
+              + " acceptableDeviation = {})", anomalyWeight, expectedDropOrSpike, acceptableDeviation);
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Check if the anomaly needs to be suppressed. An anomaly is suppressed if the startTime
+   * of the anomaly falls in the suppression time window and is within the user's expected range.
+   */
+  private boolean isAnomalySuppressed(MergedAnomalyResultDTO anomaly, Map<String, Object> suppressWindowProps) {
+    boolean shouldSuppress = false;
+    try {
+      long windowStartTime = (Long) suppressWindowProps.get(WINDOW_START_TIME_KEY);
+      long windowEndTime = (Long) suppressWindowProps.get(WINDOW_END_TIME_KEY);
+      if (anomaly.getStartTime() >= windowStartTime && anomaly.getStartTime() < windowEndTime) {
+        LOG.info("Anomaly id {} falls in the suppression time window ({}, {})", anomaly.getId(), windowStartTime, windowEndTime);
+        if (suppressWindowProps.get(IS_THRESHOLD_KEY) != null && (Boolean) suppressWindowProps.get(IS_THRESHOLD_KEY)) {
+          shouldSuppress = isAnomalySuppressedByThreshold(anomaly.getWeight(), suppressWindowProps);
+        } else {
+          shouldSuppress = true;
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception while suppressing anomaly id {} with suppress window properties {}", anomaly.getId(),
+          suppressWindowProps, e);
+    }
+
+    return shouldSuppress;
+  }
+
+  private void filterOutSuppressedAnomalies(final Set<MergedAnomalyResultDTO> anomalies) {
+    Iterator<MergedAnomalyResultDTO> anomaliesIt = anomalies.iterator();
+    MergedAnomalyResultManager anomalyMergedResultDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
+
+    List<Map<String, Object>> suppressWindowPropsList
+        = ConfigUtils.getList(config.getAlertSuppressors().get(TIME_WINDOW_SUPPRESSOR_KEY).get(TIME_WINDOWS_KEY));
+
+    while (anomaliesIt.hasNext()) {
+      MergedAnomalyResultDTO anomaly = anomaliesIt.next();
+      for (Map<String, Object> suppressWindowProps : suppressWindowPropsList) {
+        if (isAnomalySuppressed(anomaly, suppressWindowProps)) {
+          LOG.info("Suppressing anomaly id {} with suppress properties {}. Anomaly Details = {}", anomaly.getId(), suppressWindowProps, anomaly);
+          anomaliesIt.remove();
+          AnomalyFeedback feedback = anomaly.getFeedback();
+          if (feedback == null) {
+            feedback = new AnomalyFeedbackDTO();
+          }
+
+          // Suppressing is a way by which users admit that anomalies during this period
+          // are expected. We also do not want the algorithm to readjust the baseline.
+          feedback.setFeedbackType(AnomalyFeedbackType.ANOMALY);
+          feedback.setComment("Suppressed anomaly. Auto-labeling as true anomaly.");
+
+          anomaly.setFeedback(feedback);
+          anomalyMergedResultDAO.updateAnomalyFeedback(anomaly);
+        }
+      }
+    }
+  }
+
+  @Override
+  public DetectionAlertFilterResult run(DetectionAlertFilterResult results) throws Exception {
+    Preconditions.checkNotNull(results);
+    for (Set<MergedAnomalyResultDTO> anomalies : results.getResult().values()) {
+      filterOutSuppressedAnomalies(anomalies);
+    }
+
+    return results;
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
new file mode 100644
index 0000000000..2dddd4a869
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/suppress/DetectionTimeWindowSuppressorTest.java
@@ -0,0 +1,157 @@
+package com.linkedin.thirdeye.detection.alert.suppress;
+
+import com.linkedin.thirdeye.datalayer.bao.DAOTestBase;
+import com.linkedin.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import com.linkedin.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterRecipients;
+import com.linkedin.thirdeye.detection.alert.DetectionAlertFilterResult;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static com.linkedin.thirdeye.detection.alert.suppress.DetectionAlertTimeWindowSuppressor.*;
+
+
+public class DetectionTimeWindowSuppressorTest {
+
+  private DAOTestBase testDAOProvider;
+  private Set<MergedAnomalyResultDTO> anomalies;
+  private DetectionAlertConfigDTO config;
+
+  private Map<String, Object> createSuppressWindow(long startTime, long endTime, boolean isThreshold, double expectedChange,
+      double acceptableDeviation) {
+    Map<String, Object> suppressWindowProps = new HashMap<>();
+    suppressWindowProps.put(WINDOW_START_TIME_KEY, startTime);
+    suppressWindowProps.put(WINDOW_END_TIME_KEY, endTime);
+    suppressWindowProps.put(IS_THRESHOLD_KEY, isThreshold);
+    suppressWindowProps.put(EXPECTED_CHANGE_KEY, expectedChange);
+    suppressWindowProps.put(ACCEPTABLE_DEVIATION_KEY, acceptableDeviation);
+    return suppressWindowProps;
+  }
+
+  private MergedAnomalyResultDTO createAnomaly(long id, long startTime, long endTime, double weight) {
+    MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
+    anomaly.setId(id);
+    anomaly.setStartTime(startTime);
+    anomaly.setEndTime(endTime);
+    anomaly.setWeight(weight);
+    return anomaly;
+  }
+
+  private void initDetectionAlertConfig() {
+    config = new DetectionAlertConfigDTO();
+
+    List<Map<String, Object>> suppressWindowList = new ArrayList<>();
+    suppressWindowList.add(createSuppressWindow(1000, 3000, true, 0.5, 0.1));
+    suppressWindowList.add(createSuppressWindow(4500, 6000, true, 0.6, 0.2));
+
+    Map<String, Object> params = new HashMap<>();
+    params.put(TIME_WINDOWS_KEY, suppressWindowList);
+
+    Map<String, Map<String, Object>> alertSuppressors = new HashMap<>();
+    alertSuppressors.put(TIME_WINDOW_SUPPRESSOR_KEY, params);
+    config.setAlertSuppressors(alertSuppressors);
+  }
+
+  private void initAnomalies() {
+    anomalies = new HashSet<>();
+
+    anomalies.add(createAnomaly(1l, 500, 900, 0.5));
+    anomalies.add(createAnomaly(2l, 700, 1000, 0.8));
+    anomalies.add(createAnomaly(3l, 500, 1500, 0.2));
+    anomalies.add(createAnomaly(4l, 1000, 1500, 0.4));
+    anomalies.add(createAnomaly(5l, 1500, 2500, 0.6));
+    anomalies.add(createAnomaly(6l, 2500, 3000, 0.7));
+    anomalies.add(createAnomaly(7l, 2000, 3500, 0.5));
+    anomalies.add(createAnomaly(8l, 3000, 3500, 0.6));
+    anomalies.add(createAnomaly(9l, 3500, 4000, 0.1));
+    anomalies.add(createAnomaly(10l, 5000, 5500, 0.5));
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    this.testDAOProvider = DAOTestBase.getInstance();
+  }
+
+  @BeforeMethod
+  public void beforeMethod() throws Exception {
+    initAnomalies();
+    initDetectionAlertConfig();
+  }
+
+  @AfterClass(alwaysRun = true)
+  void afterClass() {
+    testDAOProvider.cleanup();
+  }
+
+  /**
+   * Anomaly distribution along with suppression windows.
+   *
+   * Anomalies 4, 5, 7, and 10 should be suppressed (not notified).
+   * Anomaly 6 is not suppressed because it falls outside the suppression region.
+   *
+   *      *-----3----*    *------7-------*
+   *      |
+   *      | *-2-*    *----5----*    *--8-*
+   *      |     |                   |
+   *      *-1-* *--4-*         *--6-*    *--9-*        *---10---*
+   *      |     |                   |
+   * _____|_____|___________________|______________|________________|
+   *      |     |                   |              |                |
+   *    500     |                   |              |                |
+   *          1000----<window1>----3000          4500--<window2>--6000
+   */
+  @Test
+  public void testTimeWindowSuppressorWithThreshold() throws Exception {
+
+    DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+    result.addMapping(new DetectionAlertFilterRecipients(Collections.singleton("test@test")), anomalies);
+
+    DetectionAlertTimeWindowSuppressor suppressor = new DetectionAlertTimeWindowSuppressor(config);
+    DetectionAlertFilterResult resultsAfterSuppress = suppressor.run(result);
+
+    Set<Long> filteredAnomalyIds = new HashSet<>(Arrays.asList(1l, 2l, 3l, 6l, 8l, 9l));
+
+    Assert.assertEquals(resultsAfterSuppress.getAllAnomalies().size(), 6);
+    for (MergedAnomalyResultDTO anomaly : resultsAfterSuppress.getAllAnomalies()) {
+      Assert.assertTrue(filteredAnomalyIds.contains(anomaly.getId()));
+    }
+  }
+
+  /**
+   * Overlapping time window suppressor without thresholds
+   */
+  @Test
+  public void testTimeWindowSuppressor() throws Exception {
+    List<Map<String, Object>> suppressWindowList = new ArrayList<>();
+    suppressWindowList.add(createSuppressWindow(500, 6000, false, 0, 0));
+
+    Map<String, Object> params = new HashMap<>();
+    params.put(TIME_WINDOWS_KEY, suppressWindowList);
+
+    Map<String, Map<String, Object>> alertSuppressors = new HashMap<>();
+    alertSuppressors.put(TIME_WINDOW_SUPPRESSOR_KEY, params);
+    config.setAlertSuppressors(alertSuppressors);
+
+    DetectionAlertFilterResult result = new DetectionAlertFilterResult();
+    result.addMapping(new DetectionAlertFilterRecipients(Collections.singleton("test@test")), anomalies);
+
+    DetectionAlertTimeWindowSuppressor suppressor = new DetectionAlertTimeWindowSuppressor(config);
+    DetectionAlertFilterResult resultsAfterSuppress = suppressor.run(result);
+
+    Assert.assertEquals(resultsAfterSuppress.getAllAnomalies().size(), 0);
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@pinot.apache.org
For additional commands, e-mail: dev-help@pinot.apache.org