You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2018/11/19 16:36:04 UTC

[incubator-pinot] branch master updated: [TE] New detection alerter to support alerting of legacy anomalies (#3495)

This is an automated email from the ASF dual-hosted git repository.

akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 27cbc56  [TE] New detection alerter to support alerting of legacy anomalies (#3495)
27cbc56 is described below

commit 27cbc563bd58467e93bec3e202aead9f66ad7ae8
Author: Akshay Rai <ak...@gmail.com>
AuthorDate: Mon Nov 19 08:35:57 2018 -0800

    [TE] New detection alerter to support alerting of legacy anomalies (#3495)
    
    Changes:
    * Introduced a flag in the new detection alert config which tells if it subscribes to legacy anomaly function or the new detection function.
    
    Tests:
    * Tested on local setup
    * Added unit tests
---
 .../datalayer/pojo/DetectionAlertConfigBean.java   |  9 ++++
 .../linkedin/thirdeye/detection/DataProvider.java  | 12 +++++
 .../thirdeye/detection/DefaultDataProvider.java    | 60 +++++++++++++++-------
 .../thirdeye/detection/alert/AlertUtils.java       |  5 ++
 .../detection/alert/DetectionAlertTaskRunner.java  |  2 +
 .../alert/StatefulDetectionAlertFilter.java        | 12 +++--
 .../detection/alert/filter/LegacyAlertFilter.java  | 24 ++++++---
 .../ToAllRecipientsDetectionAlertFilter.java       |  1 +
 .../alert/scheme/DetectionEmailAlerter.java        |  2 +-
 .../thirdeye/detection/DetectionTestUtils.java     | 11 +++-
 .../thirdeye/detection/MockDataProvider.java       | 26 ++++++++--
 .../filter/DimensionDetectionAlertFilterTest.java  | 37 ++++++++-----
 .../alert/filter/LegacyAlertFilterTest.java        | 27 +++++++++-
 .../ToAllRecipientsDetectionAlertFilterTest.java   | 40 +++++++++++----
 14 files changed, 211 insertions(+), 57 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
index 5d4c825..07984f2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datalayer/pojo/DetectionAlertConfigBean.java
@@ -36,6 +36,7 @@ public class DetectionAlertConfigBean extends AbstractBean {
   String from;
   String cronExpression;
   String application;
+  boolean onlyFetchLegacyAnomalies;
 
   Map<String, Map<String, Object>> alertSchemes;
   Map<String, Map<String, Object>> alertSuppressors;
@@ -48,6 +49,14 @@ public class DetectionAlertConfigBean extends AbstractBean {
 
   Map<String, String> refLinks;
 
+  public boolean isOnlyFetchLegacyAnomalies() {
+    return onlyFetchLegacyAnomalies;
+  }
+
+  public void setOnlyFetchLegacyAnomalies(boolean onlyFetchLegacyAnomalies) {
+    this.onlyFetchLegacyAnomalies = onlyFetchLegacyAnomalies;
+  }
+
   public boolean isActive() {
     return active;
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
index 51e1a91..c2e70e5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DataProvider.java
@@ -66,6 +66,18 @@ public interface DataProvider {
   Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, List<String> dimensions);
 
   /**
+   * Returns a multimap of anomalies (keyed by slice) generated by the legacy detection pipeline.
+   *
+   * @see MergedAnomalyResultDTO
+   * @see AnomalySlice
+   *
+   * @param slices anomaly slice
+   * @param configId configId
+   * @return multimap of anomalies (keyed by slice)
+   */
+  Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchLegacyAnomalies(Collection<AnomalySlice> slices, long configId);
+
+  /**
    * Returns a multimap of anomalies (keyed by slice) for a given set of slices.
    *
    * @see MergedAnomalyResultDTO
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
index 4ea926b..a13138d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DefaultDataProvider.java
@@ -32,6 +32,7 @@ import com.linkedin.thirdeye.datalayer.dto.MetricConfigDTO;
 import com.linkedin.thirdeye.datalayer.util.Predicate;
 import com.linkedin.thirdeye.datasource.loader.AggregationLoader;
 import com.linkedin.thirdeye.datasource.loader.TimeSeriesLoader;
+import com.linkedin.thirdeye.detection.alert.StatefulDetectionAlertFilter;
 import com.linkedin.thirdeye.detection.spi.model.AnomalySlice;
 import com.linkedin.thirdeye.detection.spi.model.EventSlice;
 import java.util.ArrayList;
@@ -45,9 +46,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class DefaultDataProvider implements DataProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultDataProvider.class);
   private static final long TIMEOUT = 60000;
 
   private final ExecutorService executor = Executors.newCachedThreadPool();
@@ -122,40 +126,60 @@ public class DefaultDataProvider implements DataProvider {
     }
   }
 
-  @Override
-  public Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
+  private Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices,
+      long configId, boolean isLegacy) {
+    String functionIdKey = "detectionConfigId";
+    if (isLegacy) {
+      functionIdKey = "functionId";
+    }
+
     Multimap<AnomalySlice, MergedAnomalyResultDTO> output = ArrayListMultimap.create();
     for (AnomalySlice slice : slices) {
       List<Predicate> predicates = new ArrayList<>();
-      if (slice.getEnd() >= 0)
+      if (slice.getEnd() >= 0) {
         predicates.add(Predicate.LT("startTime", slice.getEnd()));
-      if (slice.getStart() >= 0)
+      }
+      if (slice.getStart() >= 0) {
         predicates.add(Predicate.GT("endTime", slice.getStart()));
-      if (configId >= 0)
-        predicates.add(Predicate.EQ("detectionConfigId", configId));
+      }
+      if (configId >= 0) {
+        predicates.add(Predicate.EQ(functionIdKey, configId));
+      }
 
-      if (predicates.isEmpty())
-        throw new IllegalArgumentException("Must provide at least one of start, end, or detectionConfigId");
+      if (predicates.isEmpty()) throw new IllegalArgumentException("Must provide at least one of start, end, or " + functionIdKey);
 
       List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByPredicate(AND(predicates));
-      Iterator<MergedAnomalyResultDTO> itAnomaly = anomalies.iterator();
-      while (itAnomaly.hasNext()) {
-        MergedAnomalyResultDTO anomaly = itAnomaly.next();
-        if (configId >= 0 && (anomaly.getDetectionConfigId() == null || anomaly.getDetectionConfigId() != configId)){
-          itAnomaly.remove();
-        }
-
-        if (!slice.match(anomaly)) {
-          itAnomaly.remove();
-        }
+      anomalies.removeIf(anomaly -> !slice.match(anomaly));
+
+      if (isLegacy) {
+        anomalies.removeIf(anomaly ->
+            (configId >= 0) && (anomaly.getFunctionId() == null || anomaly.getFunctionId() != configId)
+        );
+      } else {
+        anomalies.removeIf(anomaly ->
+            (configId >= 0) && (anomaly.getDetectionConfigId() == null || anomaly.getDetectionConfigId() != configId)
+        );
       }
 
+      LOG.info("Fetched {} legacy anomalies between (startTime = {}, endTime = {}) with confid Id = {}", anomalies.size(),
+          slice.getStart(), slice.getEnd(), configId);
       output.putAll(slice, anomalies);
     }
+
     return output;
   }
 
   @Override
+  public Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchLegacyAnomalies(Collection<AnomalySlice> slices, long configId) {
+    return fetchAnomalies(slices, configId, true);
+  }
+
+  @Override
+  public Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
+    return fetchAnomalies(slices, configId, false);
+  }
+
+  @Override
   public Multimap<EventSlice, EventDTO> fetchEvents(Collection<EventSlice> slices) {
     Multimap<EventSlice, EventDTO> output = ArrayListMultimap.create();
     for (EventSlice slice : slices) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
index 3887ca6..5d655e2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/AlertUtils.java
@@ -102,6 +102,11 @@ public class AlertUtils {
       @Nullable
       @Override
       public Long apply(@Nullable MergedAnomalyResultDTO mergedAnomalyResultDTO) {
+        // Return functionId to support alerting of legacy anomalies
+        if (mergedAnomalyResultDTO.getDetectionConfigId() == null) {
+          return mergedAnomalyResultDTO.getFunctionId();
+        }
+
         return mergedAnomalyResultDTO.getDetectionConfigId();
       }
     });
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index 669b442..50285cf 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -85,6 +85,8 @@ public class DetectionAlertTaskRunner implements TaskRunner {
         AlertUtils.mergeVectorClock(alertConfig.getVectorClocks(),
         AlertUtils.makeVectorClock(result.getAllAnomalies()))
     );
+
+    LOG.info("Saving watermarks for alertConfigDAO : {}", alertConfig.toString());
     this.alertConfigDAO.save(alertConfig);
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
index 89db673..3b2044c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/StatefulDetectionAlertFilter.java
@@ -33,6 +33,7 @@ import org.apache.commons.collections.MapUtils;
 
 
 public abstract class StatefulDetectionAlertFilter extends DetectionAlertFilter {
+
   public StatefulDetectionAlertFilter(DataProvider provider, DetectionAlertConfigDTO config, long endTime) {
     super(provider, config, endTime);
   }
@@ -47,11 +48,16 @@ public abstract class StatefulDetectionAlertFilter extends DetectionAlertFilter
   protected final Set<MergedAnomalyResultDTO> filter(Map<Long, Long> vectorClocks, final long minId) {
     // retrieve all candidate anomalies
     Set<MergedAnomalyResultDTO> allAnomalies = new HashSet<>();
-    for (Long detectionConfigId : vectorClocks.keySet()) {
-      long startTime = vectorClocks.get(detectionConfigId);
+    for (Long functionId : vectorClocks.keySet()) {
+      long startTime = vectorClocks.get(functionId);
 
       AnomalySlice slice = new AnomalySlice().withStart(startTime).withEnd(this.endTime);
-      Collection<MergedAnomalyResultDTO> candidates = this.provider.fetchAnomalies(Collections.singletonList(slice), detectionConfigId).get(slice);
+      Collection<MergedAnomalyResultDTO> candidates;
+      if (this.config.isOnlyFetchLegacyAnomalies()) {
+        candidates = this.provider.fetchLegacyAnomalies(Collections.singletonList(slice), functionId).get(slice);
+      } else {
+        candidates = this.provider.fetchAnomalies(Collections.singletonList(slice), functionId).get(slice);
+      }
 
       Collection<MergedAnomalyResultDTO> anomalies =
           Collections2.filter(candidates, new Predicate<MergedAnomalyResultDTO>() {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
index 05b0454..488f1f2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilter.java
@@ -70,13 +70,19 @@ public class LegacyAlertFilter extends DetectionAlertFilter {
   public DetectionAlertFilterResult run() {
     DetectionAlertFilterResult result = new DetectionAlertFilterResult();
 
-    for (Long detectionConfigId : this.detectionConfigIds) {
-      long startTime = MapUtils.getLong(this.vectorClocks, detectionConfigId, 0L);
+    for (Long functionId : this.detectionConfigIds) {
+      long startTime = MapUtils.getLong(this.vectorClocks, functionId, 0L);
 
-      AnomalySlice slice =
-          new AnomalySlice().withStart(startTime).withEnd(this.endTime);
-      Collection<MergedAnomalyResultDTO> candidates =
-          this.provider.fetchAnomalies(Collections.singletonList(slice), detectionConfigId).get(slice);
+      AnomalySlice slice = new AnomalySlice()
+          .withStart(startTime)
+          .withEnd(this.endTime);
+
+      Collection<MergedAnomalyResultDTO> candidates;
+      if (this.config.isOnlyFetchLegacyAnomalies()) {
+        candidates = this.provider.fetchLegacyAnomalies(Collections.singletonList(slice), functionId).get(slice);
+      } else {
+        candidates = this.provider.fetchAnomalies(Collections.singletonList(slice), functionId).get(slice);
+      }
 
       Collection<MergedAnomalyResultDTO> anomalies =
           Collections2.filter(candidates, new Predicate<MergedAnomalyResultDTO>() {
@@ -86,7 +92,11 @@ public class LegacyAlertFilter extends DetectionAlertFilter {
             }
           });
 
-      result.addMapping(this.alertConfig.getReceiverAddresses(), new HashSet<>(anomalies));
+      if (result.getResult().get(this.alertConfig.getReceiverAddresses()) == null) {
+        result.addMapping(this.alertConfig.getReceiverAddresses(), new HashSet<>(anomalies));
+      } else {
+        result.getResult().get(this.alertConfig.getReceiverAddresses()).addAll(anomalies);
+      }
     }
 
     return result;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
index 58b8bdb..3f14d92 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilter.java
@@ -35,6 +35,7 @@ import org.apache.commons.collections.MapUtils;
  * The detection alert filter that sends the anomaly email to all recipients
  */
 public class ToAllRecipientsDetectionAlertFilter extends StatefulDetectionAlertFilter {
+
   private static final String PROP_RECIPIENTS = "recipients";
   private static final String PROP_TO = "to";
   private static final String PROP_CC = "cc";
diff --git a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
index 97495f1..5d763e2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/alert/scheme/DetectionEmailAlerter.java
@@ -176,7 +176,7 @@ public class DetectionEmailAlerter extends DetectionAlertScheme {
   public void run() throws Exception {
     Preconditions.checkNotNull(result);
     if (result.getAllAnomalies().size() == 0) {
-      LOG.info("Zero anomalies found, skipping sending iris alert for {}", config.getId());
+      LOG.info("Zero anomalies found, skipping sending email alert for {}", config.getId());
       return;
     }
 
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
index 49ac6b0..bf4f37e 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/DetectionTestUtils.java
@@ -26,13 +26,14 @@ import java.util.Set;
 public class DetectionTestUtils {
   private static final Long PROP_ID_VALUE = 1000L;
 
-  public static MergedAnomalyResultDTO makeAnomaly(Long configId, long start, long end, String metric, String dataset, Map<String, String> dimensions) {
+  public static MergedAnomalyResultDTO makeAnomaly(Long configId, Long legacyFunctionId, long start, long end, String metric, String dataset, Map<String, String> dimensions) {
     MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
     anomaly.setDetectionConfigId(configId);
     anomaly.setStartTime(start);
     anomaly.setEndTime(end);
     anomaly.setMetric(metric);
     anomaly.setCollection(dataset);
+    anomaly.setFunctionId(legacyFunctionId);
 
     DimensionMap dimMap = new DimensionMap();
     dimMap.putAll(dimensions);
@@ -41,10 +42,18 @@ public class DetectionTestUtils {
     return anomaly;
   }
 
+  public static MergedAnomalyResultDTO makeAnomaly(Long configId, long start, long end, String metric, String dataset, Map<String, String> dimensions) {
+    return DetectionTestUtils.makeAnomaly(configId, null, start, end, metric, dataset, dimensions);
+  }
+
   public static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
     return DetectionTestUtils.makeAnomaly(PROP_ID_VALUE, start, end, null, null, Collections.<String, String>emptyMap());
   }
 
+  public static MergedAnomalyResultDTO makeAnomaly(Long configId, Long legacyFuncId, long start, long end) {
+    return DetectionTestUtils.makeAnomaly(configId, legacyFuncId, start, end, null, null, Collections.<String, String>emptyMap());
+  }
+
   public static MergedAnomalyResultDTO makeAnomaly(Long configId, long start, long end) {
     return DetectionTestUtils.makeAnomaly(configId, start, end, null, null, Collections.<String, String>emptyMap());
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
index 084e4dc..02770d1 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/MockDataProvider.java
@@ -123,14 +123,20 @@ public class MockDataProvider implements DataProvider {
     return result;
   }
 
-  @Override
-  public Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices, long configId) {
+  private Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices,
+      long configId, boolean isLegacy) {
     Multimap<AnomalySlice, MergedAnomalyResultDTO> result = ArrayListMultimap.create();
     for (AnomalySlice slice : slices) {
       for (MergedAnomalyResultDTO anomaly : this.anomalies) {
         if (slice.match(anomaly)) {
-          if (configId >= 0 && (anomaly.getDetectionConfigId() == null || anomaly.getDetectionConfigId() != configId)){
-            continue;
+          if (isLegacy) {
+            if (configId >= 0 && (anomaly.getFunctionId() == null || anomaly.getFunctionId() != configId)) {
+              continue;
+            }
+          } else {
+            if (configId >= 0 && (anomaly.getDetectionConfigId() == null || anomaly.getDetectionConfigId() != configId)) {
+              continue;
+            }
           }
           result.put(slice, anomaly);
         }
@@ -140,6 +146,18 @@ public class MockDataProvider implements DataProvider {
   }
 
   @Override
+  public Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchLegacyAnomalies(Collection<AnomalySlice> slices,
+      long configId) {
+    return fetchAnomalies(slices, configId, true);
+  }
+
+  @Override
+  public Multimap<AnomalySlice, MergedAnomalyResultDTO> fetchAnomalies(Collection<AnomalySlice> slices,
+      long configId) {
+    return fetchAnomalies(slices, configId, false);
+  }
+
+  @Override
   public Multimap<EventSlice, EventDTO> fetchEvents(Collection<EventSlice> slices) {
     Multimap<EventSlice, EventDTO> result = ArrayListMultimap.create();
     for (EventSlice slice : slices) {
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
index ef81d6f..b96bfa1 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/DimensionDetectionAlertFilterTest.java
@@ -65,9 +65,9 @@ public class DimensionDetectionAlertFilterTest {
   private DetectionAlertFilter alertFilter;
   private List<MergedAnomalyResultDTO> detectedAnomalies;
 
-  private Map<String, Object> properties;
   private MockDataProvider provider;
   private DetectionAlertConfigDTO alertConfig;
+  private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
 
   @BeforeMethod
   public void beforeMethod() {
@@ -81,25 +81,38 @@ public class DimensionDetectionAlertFilterTest {
     this.detectedAnomalies.add(makeAnomaly(1002L,3333, 9999, Collections.singletonMap("key", "value")));
     this.detectedAnomalies.add(makeAnomaly(1003L,1111, 9999, Collections.singletonMap("key", "value")));
 
-    this.provider = new MockDataProvider().setAnomalies(this.detectedAnomalies);
+    // Anomalies generated by legacy pipeline
+    this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
 
-    this.alertConfig = new DetectionAlertConfigDTO();
+    this.provider = new MockDataProvider()
+        .setAnomalies(this.detectedAnomalies);
 
-    this.properties = new HashMap<>();
+    this.alertConfig = createDetectionAlertConfig();
+    this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
+  }
+
+  private DetectionAlertConfigDTO createDetectionAlertConfig() {
+    DetectionAlertConfigDTO alertConfig = new DetectionAlertConfigDTO();
+
+    Map<String, Object> properties = new HashMap<>();
     Map<String, Set<String>> recipients = new HashMap<>();
     recipients.put(PROP_TO, PROP_TO_VALUE);
     recipients.put(PROP_CC, PROP_CC_VALUE);
     recipients.put(PROP_BCC, PROP_BCC_VALUE);
-    this.properties.put(PROP_RECIPIENTS, recipients);
-    this.properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
-    this.properties.put(PROP_DIMENSION, PROP_DIMENSION_VALUE);
-    this.properties.put(PROP_DIMENSION_TO, PROP_DIMENSION_TO_VALUE);
+    properties.put(PROP_RECIPIENTS, recipients);
+    properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
+    properties.put(PROP_DIMENSION, PROP_DIMENSION_VALUE);
+    properties.put(PROP_DIMENSION_TO, PROP_DIMENSION_TO_VALUE);
 
-    this.alertConfig.setProperties(this.properties);
+    alertConfig.setProperties(properties);
 
     Map<Long, Long> vectorClocks = new HashMap<>();
     vectorClocks.put(PROP_ID_VALUE.get(0), 0L);
-    this.alertConfig.setVectorClocks(vectorClocks);
+    alertConfig.setVectorClocks(vectorClocks);
+
+    return alertConfig;
   }
 
   @Test
@@ -119,7 +132,7 @@ public class DimensionDetectionAlertFilterTest {
 
   @Test
   public void testAlertFilterNoChildren() throws Exception {
-    this.properties.put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(1003L));
+    this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(1003L));
     this.alertFilter = new DimensionDetectionAlertFilter(provider, alertConfig,2500L);
 
     MergedAnomalyResultDTO child = makeAnomaly(1003L, 1234, 9999);
@@ -136,7 +149,7 @@ public class DimensionDetectionAlertFilterTest {
 
   @Test
   public void testAlertFilterFeedback() throws Exception {
-    this.properties.put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(1003L));
+    this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(1003L));
     this.alertFilter = new DimensionDetectionAlertFilter(provider, alertConfig,2500L);
 
     AnomalyFeedbackDTO feedbackAnomaly = new AnomalyFeedbackDTO();
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
index 1cd98a7..6cff774 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/LegacyAlertFilterTest.java
@@ -52,6 +52,7 @@ public class LegacyAlertFilterTest {
 
   private List<MergedAnomalyResultDTO> detectedAnomalies;
   private LegacyAlertFilter legacyAlertFilter;
+  private LegacyAlertFilter legacyAlertFilterOnLegacyAnomalies;
 
   @BeforeMethod
   public void beforeMethod() throws Exception {
@@ -63,8 +64,23 @@ public class LegacyAlertFilterTest {
     this.detectedAnomalies.add(makeAnomaly(1002L, 3333, 9999));
     this.detectedAnomalies.add(makeAnomaly(1003L, 1100, 1500));
 
+    // Anomalies generated by legacy pipeline
+    this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
+
+
     DataProvider mockDataProvider = new MockDataProvider().setAnomalies(this.detectedAnomalies);
 
+    DetectionAlertConfigDTO detectionAlertConfig = createDetectionAlertConfig();
+    this.legacyAlertFilter = new LegacyAlertFilter(mockDataProvider, detectionAlertConfig, 2500L);
+
+    DetectionAlertConfigDTO detectionAlertConfigLegacyAnomalies = createDetectionAlertConfig();
+    detectionAlertConfigLegacyAnomalies.setOnlyFetchLegacyAnomalies(true);
+    this.legacyAlertFilterOnLegacyAnomalies = new LegacyAlertFilter(mockDataProvider, detectionAlertConfigLegacyAnomalies, 2500L);
+  }
+
+  private DetectionAlertConfigDTO createDetectionAlertConfig() {
     DetectionAlertConfigDTO detectionAlertConfig = new DetectionAlertConfigDTO();
     Map<String, Object> properties = new HashMap<>();
     properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
@@ -74,10 +90,9 @@ public class LegacyAlertFilterTest {
     properties.put(PROP_LEGACY_ALERT_FILTER_CLASS_NAME, "com.linkedin.thirdeye.detector.email.filter.DummyAlertFilter");
     properties.put(PROP_LEGACY_ALERT_FILTER_CONFIG, "");
     detectionAlertConfig.setProperties(properties);
-
     detectionAlertConfig.setVectorClocks(new HashMap<Long, Long>());
 
-    this.legacyAlertFilter = new LegacyAlertFilter(mockDataProvider, detectionAlertConfig, 2500L);
+    return detectionAlertConfig;
   }
 
   @Test
@@ -86,4 +101,12 @@ public class LegacyAlertFilterTest {
     Assert.assertEquals(result.getResult().get(RECEIVER_ADDRESSES),
         new HashSet<>(this.detectedAnomalies.subList(0, 4)));
   }
+
+  @Test
+  public void testFetchingLegacyAnomalies() throws Exception {
+    DetectionAlertFilterResult result = this.legacyAlertFilterOnLegacyAnomalies.run();
+    Assert.assertEquals(result.getAllAnomalies().size(), 2);
+    Assert.assertEquals(result.getResult().get(RECEIVER_ADDRESSES),
+        new HashSet<>(this.detectedAnomalies.subList(7, 9)));
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
index 16dd1bb..41b81e5 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/com/linkedin/thirdeye/detection/alert/filter/ToAllRecipientsDetectionAlertFilterTest.java
@@ -58,9 +58,9 @@ public class ToAllRecipientsDetectionAlertFilterTest {
   private DetectionAlertFilter alertFilter;
   private List<MergedAnomalyResultDTO> detectedAnomalies;
 
-  private Map<String, Object> properties;
   private MockDataProvider provider;
   private DetectionAlertConfigDTO alertConfig;
+  private DetectionAlertConfigDTO alertConfigForLegacyAnomalies;
 
   @BeforeMethod
   public void beforeMethod() {
@@ -72,23 +72,36 @@ public class ToAllRecipientsDetectionAlertFilterTest {
     this.detectedAnomalies.add(makeAnomaly(1002L,3333, 9999));
     this.detectedAnomalies.add(makeAnomaly(1003L,1100, 1500));
 
-    this.provider = new MockDataProvider().setAnomalies(this.detectedAnomalies);
+    // Anomalies generated by legacy pipeline
+    this.detectedAnomalies.add(makeAnomaly(null, 1000L, 1100, 1500));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 0, 1000));
+    this.detectedAnomalies.add(makeAnomaly(null, 1002L, 1100, 2000));
 
-    this.alertConfig = new DetectionAlertConfigDTO();
+    this.provider = new MockDataProvider()
+        .setAnomalies(this.detectedAnomalies);
 
-    this.properties = new HashMap<>();
+    this.alertConfig = createDetectionAlertConfig();
+    this.alertConfigForLegacyAnomalies = createDetectionAlertConfig();
+    this.alertConfigForLegacyAnomalies.setOnlyFetchLegacyAnomalies(true);
+  }
+
+  private DetectionAlertConfigDTO createDetectionAlertConfig() {
+    DetectionAlertConfigDTO alertConfig = new DetectionAlertConfigDTO();
+
+    Map<String, Object> properties = new HashMap<>();
     Map<String, Set<String>> recipients = new HashMap<>();
     recipients.put(PROP_TO, PROP_TO_VALUE);
     recipients.put(PROP_CC, PROP_CC_VALUE);
     recipients.put(PROP_BCC, PROP_BCC_VALUE);
-    this.properties.put(PROP_RECIPIENTS, recipients);
-    this.properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
+    properties.put(PROP_RECIPIENTS, recipients);
+    properties.put(PROP_DETECTION_CONFIG_IDS, PROP_ID_VALUE);
 
-    this.alertConfig.setProperties(properties);
+    alertConfig.setProperties(properties);
     Map<Long, Long> vectorClocks = new HashMap<>();
     vectorClocks.put(PROP_ID_VALUE.get(0), 0L);
-    this.alertConfig.setVectorClocks(vectorClocks);
+    alertConfig.setVectorClocks(vectorClocks);
 
+    return alertConfig;
   }
 
   @Test
@@ -100,8 +113,17 @@ public class ToAllRecipientsDetectionAlertFilterTest {
   }
 
   @Test
+  public void testGetAlertFilterResultForLegacyAnomalies() throws Exception {
+    this.alertFilter = new ToAllRecipientsDetectionAlertFilter(this.provider, this.alertConfigForLegacyAnomalies,2500L);
+
+    DetectionAlertFilterResult result = this.alertFilter.run();
+    Assert.assertEquals(result.getResult().get(RECIPIENTS).size(), 2);
+    Assert.assertEquals(result.getResult().get(RECIPIENTS), new HashSet<>(this.detectedAnomalies.subList(7, 9)));
+  }
+
+  @Test
   public void testAlertFilterFeedback() throws Exception {
-    this.properties.put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(1003L));
+    this.alertConfig.getProperties().put(PROP_DETECTION_CONFIG_IDS, Collections.singletonList(1003L));
     this.alertFilter = new ToAllRecipientsDetectionAlertFilter(this.provider, this.alertConfig,2500L);
 
     AnomalyFeedbackDTO feedbackAnomaly = new AnomalyFeedbackDTO();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org