You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/07/22 17:44:24 UTC

[incubator-pinot] branch master updated: [TE] anomaly filters potential inconsistency issue fix (#4448)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d18fab9  [TE] anomaly filters potential inconsistency issue fix (#4448)
d18fab9 is described below

commit d18fab9536d73a541f2d142fb1eda433ff42e34a
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Jul 22 10:44:19 2019 -0700

    [TE] anomaly filters potential inconsistency issue fix (#4448)
    
    Currently, the anomaly filters will re-fetch data from the data source to get the current value for anomalies. This works fine in most cases. However, when the data inconsistency issue happens, it might filter based on the latest fetched data instead of filtering on the current value stored in the anomaly. When this happens, it will cause confusions in the Preview UI that the filter is not working correctly. This PR fixes this issue.
---
 .../AbsoluteChangeRuleAnomalyFilter.java           |  3 +-
 .../PercentageChangeRuleAnomalyFilter.java         | 10 +---
 .../SitewideImpactRuleAnomalyFilter.java           |  5 +-
 .../components/ThresholdRuleAnomalyFilter.java     | 14 +----
 .../thirdeye/detection/DetectionTestUtils.java     |  9 ++++
 .../AbsoluteChangeRuleAnomalyFilterTest.java       | 25 +++------
 .../PercentageChangeRuleAnomalyFilterTest.java     | 60 +++++++++++-----------
 .../SitewideImpactRuleAnomalyFilterTest.java       | 20 +++-----
 .../components/ThresholdRuleAnomalyFilterTest.java | 56 ++++++++++----------
 9 files changed, 88 insertions(+), 114 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleAnomalyFilter.java
index 7407de4..e49d771 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleAnomalyFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleAnomalyFilter.java
@@ -56,7 +56,6 @@ public class AbsoluteChangeRuleAnomalyFilter implements AnomalyFilter<AbsoluteCh
     List<MetricSlice> slices = new ArrayList<>();
     MetricSlice currentSlice =
         MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
-    slices.add(currentSlice);
 
     // customize baseline offset
     if (baseline != null) {
@@ -66,7 +65,7 @@ public class AbsoluteChangeRuleAnomalyFilter implements AnomalyFilter<AbsoluteCh
     Map<MetricSlice, DataFrame> aggregates =
         this.dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(slices)).getAggregates();
 
-    double currentValue = aggregates.get(currentSlice).getDouble(COL_VALUE, 0);
+    double currentValue = anomaly.getAvgCurrentVal();
     double baselineValue =
         baseline == null ? anomaly.getAvgBaselineVal() : this.baseline.gather(currentSlice, aggregates).getDouble(COL_VALUE, 0);
     // if inconsistent with up/down, filter the anomaly
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java
index dd50cd5..59f697c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilter.java
@@ -60,8 +60,6 @@ public class PercentageChangeRuleAnomalyFilter implements AnomalyFilter<Percenta
     MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
     List<MetricSlice> slices = new ArrayList<>();
     MetricSlice currentSlice = MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
-    slices.add(currentSlice);
-
     // customize baseline offset
     if (baseline != null) {
       slices.addAll(this.baseline.scatter(currentSlice));
@@ -70,13 +68,7 @@ public class PercentageChangeRuleAnomalyFilter implements AnomalyFilter<Percenta
     Map<MetricSlice, DataFrame> aggregates =
         this.dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(slices)).getAggregates();
 
-    double currentValue;
-    if (aggregates.get(currentSlice).isEmpty()) {
-      currentValue = anomaly.getAvgCurrentVal();
-    } else {
-      currentValue = aggregates.get(currentSlice).getDouble(COL_VALUE, 0);
-    }
-
+    double currentValue = anomaly.getAvgCurrentVal();
     double baselineValue;
     if (baseline == null) {
       baselineValue = anomaly.getAvgBaselineVal();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java
index 4ab6cf5..9ecd60e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/SitewideImpactRuleAnomalyFilter.java
@@ -64,7 +64,6 @@ public class SitewideImpactRuleAnomalyFilter implements AnomalyFilter<SitewideIm
     MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
     List<MetricSlice> slices = new ArrayList<>();
     MetricSlice currentSlice = MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
-    slices.add(currentSlice);
 
     // customize baseline offset
     MetricSlice baselineSlice = null;
@@ -85,13 +84,11 @@ public class SitewideImpactRuleAnomalyFilter implements AnomalyFilter<SitewideIm
     }
     slices.add(siteWideSlice);
 
-
-
     Map<MetricSlice, DataFrame> aggregates = this.dataFetcher.fetchData(
         new InputDataSpec().withAggregateSlices(slices))
         .getAggregates();
 
-    double currentValue = getValueFromAggregates(currentSlice, aggregates);
+    double currentValue = anomaly.getAvgCurrentVal();
     double baselineValue = baseline == null ? anomaly.getAvgBaselineVal() :  this.baseline.gather(currentSlice, aggregates).getDouble(COL_VALUE, 0);
     double siteWideValue = getValueFromAggregates(siteWideSlice, aggregates);
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
index 4a1245d..0f2e7b4 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
@@ -52,16 +52,9 @@ public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFi
   private double maxValueDaily;
   private double maxValue;
   private double minValue;
-  private InputDataFetcher dataFetcher;
-
   @Override
   public boolean isQualified(MergedAnomalyResultDTO anomaly) {
-    MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
-    MetricSlice currentSlice = MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
-    InputData data = dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(Collections.singleton(currentSlice)));
-
-    Map<MetricSlice, DataFrame> aggregates = data.getAggregates();
-    double currentValue = getValueFromAggregates(currentSlice, aggregates);
+    double currentValue = anomaly.getAvgCurrentVal();
 
     Interval anomalyInterval = new Interval(anomaly.getStartTime(), anomaly.getEndTime());
     double hourlyMultiplier = TimeUnit.HOURS.toMillis(1) / (double) anomalyInterval.toDurationMillis();
@@ -93,10 +86,5 @@ public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFi
     this.maxValueDaily = spec.getMaxValueDaily();
     this.maxValue = spec.getMaxValue();
     this.minValue = spec.getMinValue();
-    this.dataFetcher = dataFetcher;
-  }
-
-  double getValueFromAggregates(MetricSlice slice, Map<MetricSlice, DataFrame> aggregates) {
-    return aggregates.get(slice).getDouble(COL_VALUE, 0);
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
index 6fc551f..0e3999a 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DetectionTestUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.pinot.thirdeye.detection;
 
+import java.util.HashMap;
 import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import java.util.Collections;
@@ -93,4 +94,12 @@ public class DetectionTestUtils {
     result.setAvgBaselineVal(baselineValue);
     return result;
   }
+
+  public static MergedAnomalyResultDTO makeAnomaly(long start, long end, long configId, String metricUrn,
+      double currentVal) {
+    MergedAnomalyResultDTO anomaly = makeAnomaly(configId, start, end, new HashMap<>());
+    anomaly.setMetricUrn(metricUrn);
+    anomaly.setAvgCurrentVal(currentVal);
+    return anomaly;
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleAnomalyFilterTest.java
index 81b61e9..fba1469 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleAnomalyFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleAnomalyFilterTest.java
@@ -16,24 +16,22 @@
 
 package org.apache.pinot.thirdeye.detection.components;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.detection.DataProvider;
 import org.apache.pinot.thirdeye.detection.DefaultInputDataFetcher;
 import org.apache.pinot.thirdeye.detection.DetectionTestUtils;
 import org.apache.pinot.thirdeye.detection.MockDataProvider;
 import org.apache.pinot.thirdeye.detection.spec.AbsoluteChangeRuleAnomalyFilterSpec;
-import org.apache.pinot.thirdeye.detection.spec.PercentageChangeRuleAnomalyFilterSpec;
 import org.apache.pinot.thirdeye.detection.spi.components.AnomalyFilter;
 import org.apache.pinot.thirdeye.rootcause.timeseries.Baseline;
 import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregate;
 import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregateType;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 import org.joda.time.DateTimeZone;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -44,6 +42,7 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
 
 public class AbsoluteChangeRuleAnomalyFilterTest {
   private static final String METRIC_URN = "thirdeye:metric:123";
+  private static final long CONFIG_ID = 125L;
 
   private DataProvider testDataProvider;
   private Baseline baseline;
@@ -73,18 +72,10 @@ public class AbsoluteChangeRuleAnomalyFilterTest {
     spec.setThreshold(100);
     spec.setPattern("up_or_down");
     AnomalyFilter filter = new AbsoluteChangeRuleAnomalyFilter();
-    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, 125L));
+    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, CONFIG_ID));
     List<Boolean> results =
-        Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6)).stream().map(anomaly -> filter.isQualified(anomaly)).collect(
+        Arrays.asList(DetectionTestUtils.makeAnomaly(0, 2, CONFIG_ID, METRIC_URN, 150), DetectionTestUtils.makeAnomaly(4, 6, CONFIG_ID, METRIC_URN, 500)).stream().map(anomaly -> filter.isQualified(anomaly)).collect(
             Collectors.toList());
     Assert.assertEquals(results, Arrays.asList(false, true));
   }
-
-
-  private static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
-    Map<String, String> dimensions = new HashMap<>();
-    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(125L, start, end, dimensions);
-    anomaly.setMetricUrn(METRIC_URN);
-    return anomaly;
-  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilterTest.java
index edc088e..011f8cc 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleAnomalyFilterTest.java
@@ -16,10 +16,14 @@
 
 package org.apache.pinot.thirdeye.detection.components;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.detection.DataProvider;
 import org.apache.pinot.thirdeye.detection.DefaultInputDataFetcher;
 import org.apache.pinot.thirdeye.detection.DetectionTestUtils;
@@ -29,11 +33,6 @@ import org.apache.pinot.thirdeye.detection.spi.components.AnomalyFilter;
 import org.apache.pinot.thirdeye.rootcause.timeseries.Baseline;
 import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregate;
 import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregateType;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 import org.joda.time.DateTimeZone;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -44,6 +43,7 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
 
 public class PercentageChangeRuleAnomalyFilterTest {
   private static final String METRIC_URN = "thirdeye:metric:123";
+  private static final long CONFIG_ID = 125L;
 
   private DataProvider testDataProvider;
   private Baseline baseline;
@@ -60,51 +60,53 @@ public class PercentageChangeRuleAnomalyFilterTest {
     MetricSlice baselineSlice3 = this.baseline.scatter(slice3).get(0);
 
     Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
-    aggregates.put(slice1, new DataFrame().addSeries(COL_TIME, slice1.getStart()).addSeries(COL_VALUE, 150).setIndex(COL_TIME));
-    aggregates.put(baselineSlice1, new DataFrame().addSeries(COL_TIME, baselineSlice1.getStart()).addSeries(COL_VALUE, 200).setIndex(COL_TIME));
-    aggregates.put(slice2, new DataFrame().addSeries(COL_VALUE, 500).addSeries(COL_TIME, slice2.getStart()).setIndex(COL_TIME));
-    aggregates.put(baselineSlice2, new DataFrame().addSeries(COL_VALUE, 1000).addSeries(COL_TIME, baselineSlice2.getStart()).setIndex(COL_TIME));
-    aggregates.put(slice3, new DataFrame().addSeries(COL_VALUE, 200).addSeries(COL_TIME, slice3.getStart()).setIndex(COL_TIME));
-    aggregates.put(baselineSlice3, new DataFrame().addSeries(COL_VALUE, 150).addSeries(COL_TIME, baselineSlice3.getStart()).setIndex(COL_TIME));
+    aggregates.put(slice1,
+        new DataFrame().addSeries(COL_TIME, slice1.getStart()).addSeries(COL_VALUE, 150).setIndex(COL_TIME));
+    aggregates.put(baselineSlice1,
+        new DataFrame().addSeries(COL_TIME, baselineSlice1.getStart()).addSeries(COL_VALUE, 200).setIndex(COL_TIME));
+    aggregates.put(slice2,
+        new DataFrame().addSeries(COL_VALUE, 500).addSeries(COL_TIME, slice2.getStart()).setIndex(COL_TIME));
+    aggregates.put(baselineSlice2,
+        new DataFrame().addSeries(COL_VALUE, 1000).addSeries(COL_TIME, baselineSlice2.getStart()).setIndex(COL_TIME));
+    aggregates.put(slice3,
+        new DataFrame().addSeries(COL_VALUE, 200).addSeries(COL_TIME, slice3.getStart()).setIndex(COL_TIME));
+    aggregates.put(baselineSlice3,
+        new DataFrame().addSeries(COL_VALUE, 150).addSeries(COL_TIME, baselineSlice3.getStart()).setIndex(COL_TIME));
 
     this.testDataProvider = new MockDataProvider().setAggregates(aggregates);
   }
 
   @Test
-  public void testPercentageChangeFilter(){
+  public void testPercentageChangeFilter() {
     PercentageChangeRuleAnomalyFilterSpec spec = new PercentageChangeRuleAnomalyFilterSpec();
     spec.setOffset("mean1w");
     spec.setThreshold(0.5);
     spec.setPattern("up_or_down");
     AnomalyFilter filter = new PercentageChangeRuleAnomalyFilter();
-    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, 125L));
+    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, CONFIG_ID));
     List<Boolean> results =
-        Stream.of(makeAnomaly(1555570800000L, 1555693200000L), makeAnomaly(1554163200000L, 1554249600000L)).map(anomaly -> filter.isQualified(anomaly)).collect(
-            Collectors.toList());
+        Stream.of(DetectionTestUtils.makeAnomaly(1555570800000L, 1555693200000L, CONFIG_ID, METRIC_URN, 150),
+            DetectionTestUtils.makeAnomaly(1554163200000L, 1554249600000L, CONFIG_ID, METRIC_URN, 500))
+            .map(anomaly -> filter.isQualified(anomaly))
+            .collect(Collectors.toList());
     Assert.assertEquals(results, Arrays.asList(false, true));
   }
 
   @Test
-  public void testPercentageChangeFilterTwoSide(){
+  public void testPercentageChangeFilterTwoSide() {
     PercentageChangeRuleAnomalyFilterSpec spec = new PercentageChangeRuleAnomalyFilterSpec();
     spec.setOffset("mean1w");
     spec.setUpThreshold(0.25);
     spec.setDownThreshold(0.5);
     spec.setPattern("up_or_down");
     AnomalyFilter filter = new PercentageChangeRuleAnomalyFilter();
-    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, 125L));
+    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, CONFIG_ID));
     List<Boolean> results =
-        Stream.of(makeAnomaly(1555570800000L, 1555693200000L), makeAnomaly(1554163200000L, 1554249600000L), makeAnomaly(1554076800000L, 1554163200000L)).map(anomaly -> filter.isQualified(anomaly)).collect(
-            Collectors.toList());
+        Stream.of(DetectionTestUtils.makeAnomaly(1555570800000L, 1555693200000L, CONFIG_ID, METRIC_URN, 150),
+            DetectionTestUtils.makeAnomaly(1554163200000L, 1554249600000L, CONFIG_ID, METRIC_URN, 500),
+            DetectionTestUtils.makeAnomaly(1554076800000L, 1554163200000L, CONFIG_ID, METRIC_URN, 200))
+            .map(anomaly -> filter.isQualified(anomaly))
+            .collect(Collectors.toList());
     Assert.assertEquals(results, Arrays.asList(false, true, true));
   }
-
-
-
-  private static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
-    Map<String, String> dimensions = new HashMap<>();
-    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(125L, start, end, dimensions);
-    anomaly.setMetricUrn(METRIC_URN);
-    return anomaly;
-  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java
index cc27e8a..5a6cfcb 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/SitewideImpactRuleAnomalyFilterTest.java
@@ -42,6 +42,7 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
 
 public class SitewideImpactRuleAnomalyFilterTest {
   private static final String METRIC_URN = "thirdeye:metric:123";
+  private static final long CONFIG_ID = 125L;
 
   private DataProvider testDataProvider;
   private Baseline baseline;
@@ -71,28 +72,23 @@ public class SitewideImpactRuleAnomalyFilterTest {
     spec.setOffset("median3w");
     spec.setPattern("down");
     SitewideImpactRuleAnomalyFilter filter = new SitewideImpactRuleAnomalyFilter();
-    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, 125L));
-
+    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, CONFIG_ID));
     List<Boolean> results =
-        Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6)).stream().map(anomaly -> filter.isQualified(anomaly)).collect(Collectors.toList());
+        Arrays.asList(
+            DetectionTestUtils.makeAnomaly(0, 2, CONFIG_ID, METRIC_URN, 150), DetectionTestUtils.makeAnomaly(4, 6, CONFIG_ID, METRIC_URN, 500)).stream().map(anomaly -> filter.isQualified(anomaly)).collect(Collectors.toList());
     Assert.assertEquals(results, Arrays.asList(false, true));
   }
 
-  private static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
-    Map<String, String> dimensions = new HashMap<>();
-    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(125L, start, end, dimensions);
-    anomaly.setMetricUrn(METRIC_URN);
-    return anomaly;
-  }
-
   @Test
   public void testSiteWideImpactFilterNoOffset() {
     SitewideImpactRuleAnomalyFilterSpec spec = new SitewideImpactRuleAnomalyFilterSpec();
     spec.setThreshold(0.5);
     spec.setPattern("down");
     SitewideImpactRuleAnomalyFilter filter = new SitewideImpactRuleAnomalyFilter();
-    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, 125L));
-    List<MergedAnomalyResultDTO> anomalyResultDTOs = Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6));
+    filter.init(spec, new DefaultInputDataFetcher(this.testDataProvider, CONFIG_ID));
+    List<MergedAnomalyResultDTO> anomalyResultDTOs = Arrays.asList(
+        DetectionTestUtils.makeAnomaly(0, 2, CONFIG_ID, METRIC_URN, 150), DetectionTestUtils.makeAnomaly(4, 6,
+        CONFIG_ID, METRIC_URN, 500));
     anomalyResultDTOs.get(0).setAvgCurrentVal(150);
     anomalyResultDTOs.get(0).setAvgBaselineVal(200);
     anomalyResultDTOs.get(1).setAvgCurrentVal(500);
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
index 3429d27..ebee72f 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
@@ -49,6 +49,7 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
 
 public class ThresholdRuleAnomalyFilterTest {
   private static final String METRIC_URN = "thirdeye:metric:123";
+  private static final long CONFIG_ID = 125L;
 
   private List<MergedAnomalyResultDTO> anomalies;
   private MockPipelineLoader loader;
@@ -62,14 +63,10 @@ public class ThresholdRuleAnomalyFilterTest {
   @BeforeMethod
   public void beforeMethod() {
     Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
-    aggregates.put(MetricSlice.from(123L, 1551186000000L, 1551189600000L),
-        new DataFrame().addSeries(COL_VALUE, 0));
-    aggregates.put(MetricSlice.from(123L, 1551189600000L, 1551193200000L),
-        new DataFrame().addSeries(COL_VALUE, 200));
-    aggregates.put(MetricSlice.from(123L, 1551193200000L, 1551196800000L),
-        new DataFrame().addSeries(COL_VALUE, 500));
-    aggregates.put(MetricSlice.from(123L, 1551196800000L, 1551200400000L),
-        new DataFrame().addSeries(COL_VALUE, 1000));
+    aggregates.put(MetricSlice.from(123L, 1551186000000L, 1551189600000L), new DataFrame().addSeries(COL_VALUE, 0));
+    aggregates.put(MetricSlice.from(123L, 1551189600000L, 1551193200000L), new DataFrame().addSeries(COL_VALUE, 200));
+    aggregates.put(MetricSlice.from(123L, 1551193200000L, 1551196800000L), new DataFrame().addSeries(COL_VALUE, 500));
+    aggregates.put(MetricSlice.from(123L, 1551196800000L, 1551200400000L), new DataFrame().addSeries(COL_VALUE, 1000));
 
     MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
     metricConfigDTO.setId(123L);
@@ -84,7 +81,7 @@ public class ThresholdRuleAnomalyFilterTest {
     datasetConfigDTO.setTimezone("UTC");
 
     this.config = new DetectionConfigDTO();
-    this.config.setId(125L);
+    this.config.setId(CONFIG_ID);
     this.properties = new HashMap<>();
     this.properties.put("nested", Collections.singletonList(Collections.singletonMap("className", "dummy")));
     this.properties.put("filter", "$abc");
@@ -94,15 +91,18 @@ public class ThresholdRuleAnomalyFilterTest {
     this.config.setComponentSpecs(ImmutableMap.of("abc", this.specs));
     this.config.setProperties(this.properties);
 
-    this.anomalies = Arrays.asList(makeAnomaly(1551186000000L, 1551189600000L), makeAnomaly(1551189600000L, 1551193200000L), makeAnomaly(1551193200000L, 1551196800000L), makeAnomaly(1551196800000L, 1551200400000L));
+    this.anomalies =
+        Arrays.asList(DetectionTestUtils.makeAnomaly(1551186000000L, 1551189600000L, CONFIG_ID, METRIC_URN, 0),
+            DetectionTestUtils.makeAnomaly(1551189600000L, 1551193200000L, CONFIG_ID, METRIC_URN, 200),
+            DetectionTestUtils.makeAnomaly(1551193200000L, 1551196800000L, CONFIG_ID, METRIC_URN, 500),
+            DetectionTestUtils.makeAnomaly(1551196800000L, 1551200400000L, CONFIG_ID, METRIC_URN, 1000));
 
     this.runs = new ArrayList<>();
 
-    this.loader = new MockPipelineLoader(this.runs, Collections.singletonList(
-        new MockPipelineOutput(this.anomalies, 1551200400000L)));
+    this.loader = new MockPipelineLoader(this.runs,
+        Collections.singletonList(new MockPipelineOutput(this.anomalies, 1551200400000L)));
 
-    this.testDataProvider = new MockDataProvider()
-        .setLoader(this.loader)
+    this.testDataProvider = new MockDataProvider().setLoader(this.loader)
         .setMetrics(Collections.singletonList(metricConfigDTO))
         .setDatasets(Collections.singletonList(datasetConfigDTO))
         .setAggregates(aggregates);
@@ -110,7 +110,8 @@ public class ThresholdRuleAnomalyFilterTest {
 
   @Test(priority = 0)
   public void testThresholdRuleFilterNone() throws Exception {
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+    this.thresholdRuleFilter =
+        new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
@@ -125,7 +126,8 @@ public class ThresholdRuleAnomalyFilterTest {
   @Test(priority = 1)
   public void testThresholdRuleFilterMin() throws Exception {
     this.specs.put("minValueHourly", 200);
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+    this.thresholdRuleFilter =
+        new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
@@ -139,7 +141,8 @@ public class ThresholdRuleAnomalyFilterTest {
   @Test(priority = 2)
   public void testThresholdRuleFilterMax() throws Exception {
     this.specs.put("maxValueHourly", 500);
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+    this.thresholdRuleFilter =
+        new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
@@ -154,7 +157,8 @@ public class ThresholdRuleAnomalyFilterTest {
   public void testThresholdRuleFilterBoth() throws Exception {
     this.specs.put("minValueHourly", 200);
     this.specs.put("maxValueHourly", 500);
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+    this.thresholdRuleFilter =
+        new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
@@ -167,7 +171,8 @@ public class ThresholdRuleAnomalyFilterTest {
   @Test(priority = 4)
   public void testThresholdRuleFilterMinDaily() throws Exception {
     this.specs.put("minValueDaily", 2400);
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+    this.thresholdRuleFilter =
+        new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
@@ -181,7 +186,8 @@ public class ThresholdRuleAnomalyFilterTest {
   @Test(priority = 5)
   public void testThresholdRuleFilterMaxDaily() throws Exception {
     this.specs.put("maxValueDaily", 12000);
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+    this.thresholdRuleFilter =
+        new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
@@ -196,18 +202,12 @@ public class ThresholdRuleAnomalyFilterTest {
   public void testThresholdRuleFilterCurrentMinAndMax() throws Exception {
     this.specs.put("minValue", 300);
     this.specs.put("maxValue", 500);
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+    this.thresholdRuleFilter =
+        new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
     Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
     Assert.assertEquals(anomalies.size(), 1);
     Assert.assertEquals(anomalies.get(0), this.anomalies.get(2));
   }
-
-
-  private static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
-    MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(125L, start, end);
-    anomaly.setMetricUrn(METRIC_URN);
-    return anomaly;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org