You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/02/26 01:37:37 UTC

[incubator-pinot] branch master updated: [TE] detection - threshold-based filter (#3881)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 209c966  [TE] detection - threshold-based filter (#3881)
209c966 is described below

commit 209c9661c75ac340c94b54167e64b7f929a4e0b0
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Feb 25 17:37:33 2019 -0800

    [TE] detection - threshold-based filter (#3881)
    
    Threshold-based anomaly filter to work on the hourly/daily bucket.
---
 .../components/ThresholdRuleAnomalyFilter.java     | 32 ++++++++---
 .../detection/spec/ThresholdRuleFilterSpec.java    | 40 +++++++++----
 .../components/ThresholdRuleAnomalyFilterTest.java | 66 ++++++++++++++++------
 3 files changed, 101 insertions(+), 37 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
index b24ce1f..be7fae0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
@@ -19,6 +19,7 @@
 
 package org.apache.pinot.thirdeye.detection.components;
 
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
@@ -34,6 +35,7 @@ import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
 import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
 import java.util.Collections;
 import java.util.Map;
+import org.joda.time.Interval;
 
 import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
 
@@ -42,12 +44,12 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
  * This threshold rule filter stage filters the anomalies if either the min or max thresholds do not pass.
  */
 @Components(title = "Aggregate Threshold Filter", type = "THRESHOLD_RULE_FILTER", tags = {
-    DetectionTag.RULE_FILTER}, description = "Threshold rule filter. filters the anomalies if either the min or max thresholds do not satisfied.", presentation = {
-    @PresentationOption(name = "absolute value", description = "aggregated absolute value within a time period", template = "is higher than ${min} and lower than ${max}")}, params = {
-    @Param(name = "min", placeholder = "value"), @Param(name = "max", placeholder = "value")})
+    DetectionTag.RULE_FILTER}, description = "Threshold rule filter. filters the anomalies if either the min or max thresholds do not satisfied.")
 public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFilterSpec> {
-  private double min;
-  private double max;
+  private double minValueHourly;
+  private double maxValueHourly;
+  private double minValueDaily;
+  private double maxValueDaily;
   private InputDataFetcher dataFetcher;
 
   @Override
@@ -58,10 +60,20 @@ public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFi
 
     Map<MetricSlice, DataFrame> aggregates = data.getAggregates();
     double currentValue = getValueFromAggregates(currentSlice, aggregates);
-    if (!Double.isNaN(this.min) && currentValue < this.min) {
+
+    Interval anomalyInterval = new Interval(anomaly.getStartTime(), anomaly.getEndTime());
+    double hourlyMultiplier = TimeUnit.HOURS.toMillis(1) / (double) anomalyInterval.toDurationMillis();
+    double dailyMultiplier = TimeUnit.DAYS.toMillis(1) / (double) anomalyInterval.toDurationMillis();
+    if (!Double.isNaN(this.minValueHourly) && currentValue * hourlyMultiplier < this.minValueHourly) {
+      return false;
+    }
+    if (!Double.isNaN(this.maxValueHourly) && currentValue * hourlyMultiplier > this.maxValueHourly) {
+      return false;
+    }
+    if (!Double.isNaN(this.minValueDaily) && currentValue * dailyMultiplier< this.minValueDaily) {
       return false;
     }
-    if (!Double.isNaN(this.max) && currentValue > this.max) {
+    if (!Double.isNaN(this.maxValueDaily) && currentValue * dailyMultiplier > this.maxValueDaily) {
       return false;
     }
     return true;
@@ -69,8 +81,10 @@ public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFi
 
   @Override
   public void init(ThresholdRuleFilterSpec spec, InputDataFetcher dataFetcher) {
-    this.min = spec.getMin();
-    this.max = spec.getMax();
+    this.minValueHourly = spec.getMinValueHourly();
+    this.maxValueHourly = spec.getMaxValueHourly();
+    this.minValueDaily = spec.getMinValueDaily();
+    this.maxValueDaily = spec.getMaxValueDaily();
     this.dataFetcher = dataFetcher;
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java
index 72f4655..2c97df5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java
@@ -24,22 +24,42 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class ThresholdRuleFilterSpec extends AbstractSpec {
-  private double min = Double.NaN;
-  private double max = Double.NaN;
+  private double minValueHourly = Double.NaN;
+  private double minValueDaily = Double.NaN;
 
-  public double getMin() {
-    return min;
+  private double maxValueHourly = Double.NaN;
+  private double maxValueDaily = Double.NaN;
+
+
+  public double getMinValueHourly() {
+    return minValueHourly;
+  }
+
+  public void setMinValueHourly(double minValueHourly) {
+    this.minValueHourly = minValueHourly;
+  }
+
+  public double getMinValueDaily() {
+    return minValueDaily;
+  }
+
+  public void setMinValueDaily(double minValueDaily) {
+    this.minValueDaily = minValueDaily;
+  }
+
+  public double getMaxValueHourly() {
+    return maxValueHourly;
   }
 
-  public void setMin(double min) {
-    this.min = min;
+  public void setMaxValueHourly(double maxValueHourly) {
+    this.maxValueHourly = maxValueHourly;
   }
 
-  public double getMax() {
-    return max;
+  public double getMaxValueDaily() {
+    return maxValueDaily;
   }
 
-  public void setMax(double max) {
-    this.max = max;
+  public void setMaxValueDaily(double maxValueDaily) {
+    this.maxValueDaily = maxValueDaily;
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
index de52d00..84ae901 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
@@ -24,8 +24,10 @@ import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
 import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DefaultInputDataFetcher;
 import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
 import org.apache.pinot.thirdeye.detection.DetectionTestUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
 import org.apache.pinot.thirdeye.detection.MockDataProvider;
 import org.apache.pinot.thirdeye.detection.MockPipeline;
 import org.apache.pinot.thirdeye.detection.MockPipelineLoader;
@@ -60,13 +62,13 @@ public class ThresholdRuleAnomalyFilterTest {
   @BeforeMethod
   public void beforeMethod() {
     Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
-    aggregates.put(MetricSlice.from(123L, 0, 2),
+    aggregates.put(MetricSlice.from(123L, 1551186000000L, 1551189600000L),
         new DataFrame().addSeries(COL_VALUE, 0));
-    aggregates.put(MetricSlice.from(123L, 4, 6),
+    aggregates.put(MetricSlice.from(123L, 1551189600000L, 1551193200000L),
         new DataFrame().addSeries(COL_VALUE, 200));
-    aggregates.put(MetricSlice.from(123L, 6, 8),
+    aggregates.put(MetricSlice.from(123L, 1551193200000L, 1551196800000L),
         new DataFrame().addSeries(COL_VALUE, 500));
-    aggregates.put(MetricSlice.from(123L, 8, 10),
+    aggregates.put(MetricSlice.from(123L, 1551196800000L, 1551200400000L),
         new DataFrame().addSeries(COL_VALUE, 1000));
 
     MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
@@ -92,12 +94,12 @@ public class ThresholdRuleAnomalyFilterTest {
     this.config.setComponentSpecs(ImmutableMap.of("abc", this.specs));
     this.config.setProperties(this.properties);
 
-    this.anomalies = Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6), makeAnomaly(6, 8), makeAnomaly(8, 10));
+    this.anomalies = Arrays.asList(makeAnomaly(1551186000000L, 1551189600000L), makeAnomaly(1551189600000L, 1551193200000L), makeAnomaly(1551193200000L, 1551196800000L), makeAnomaly(1551196800000L, 1551200400000L));
 
     this.runs = new ArrayList<>();
 
     this.loader = new MockPipelineLoader(this.runs, Collections.singletonList(
-        new MockPipelineOutput(this.anomalies, 10)));
+        new MockPipelineOutput(this.anomalies, 1551200400000L)));
 
     this.testDataProvider = new MockDataProvider()
         .setLoader(this.loader)
@@ -108,11 +110,11 @@ public class ThresholdRuleAnomalyFilterTest {
 
   @Test(priority = 0)
   public void testThresholdRuleFilterNone() throws Exception {
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
-    Assert.assertEquals(result.getLastTimestamp(), 10);
+    Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
     Assert.assertEquals(anomalies.size(), 4);
     Assert.assertEquals(anomalies.get(0), this.anomalies.get(0));
     Assert.assertEquals(anomalies.get(1), this.anomalies.get(1));
@@ -122,12 +124,12 @@ public class ThresholdRuleAnomalyFilterTest {
 
   @Test(priority = 1)
   public void testThresholdRuleFilterMin() throws Exception {
-    this.specs.put("min", 200);
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+    this.specs.put("minValueHourly", 200);
+    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
-    Assert.assertEquals(result.getLastTimestamp(), 10);
+    Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
     Assert.assertEquals(anomalies.size(), 3);
     Assert.assertEquals(anomalies.get(0), this.anomalies.get(1));
     Assert.assertEquals(anomalies.get(1), this.anomalies.get(2));
@@ -136,12 +138,12 @@ public class ThresholdRuleAnomalyFilterTest {
 
   @Test(priority = 2)
   public void testThresholdRuleFilterMax() throws Exception {
-    this.specs.put("max", 500);
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+    this.specs.put("maxValueHourly", 500);
+    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
-    Assert.assertEquals(result.getLastTimestamp(), 10);
+    Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
     Assert.assertEquals(anomalies.size(), 3);
     Assert.assertEquals(anomalies.get(0), this.anomalies.get(0));
     Assert.assertEquals(anomalies.get(1), this.anomalies.get(1));
@@ -150,18 +152,46 @@ public class ThresholdRuleAnomalyFilterTest {
 
   @Test(priority = 3)
   public void testThresholdRuleFilterBoth() throws Exception {
-    this.specs.put("min", 200);
-    this.specs.put("max", 500);
-    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+    this.specs.put("minValueHourly", 200);
+    this.specs.put("maxValueHourly", 500);
+    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
-    Assert.assertEquals(result.getLastTimestamp(), 10);
+    Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
     Assert.assertEquals(anomalies.size(), 2);
     Assert.assertEquals(anomalies.get(0), this.anomalies.get(1));
     Assert.assertEquals(anomalies.get(1), this.anomalies.get(2));
   }
 
+  @Test(priority = 4)
+  public void testThresholdRuleFilterMinDaily() throws Exception {
+    this.specs.put("minValueDaily", 2400);
+    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+
+    DetectionPipelineResult result = this.thresholdRuleFilter.run();
+    List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
+    Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
+    Assert.assertEquals(anomalies.size(), 3);
+    Assert.assertEquals(anomalies.get(0), this.anomalies.get(1));
+    Assert.assertEquals(anomalies.get(1), this.anomalies.get(2));
+    Assert.assertEquals(anomalies.get(2), this.anomalies.get(3));
+  }
+
+  @Test(priority = 5)
+  public void testThresholdRuleFilterMaxDaily() throws Exception {
+    this.specs.put("maxValueDaily", 12000);
+    this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+
+    DetectionPipelineResult result = this.thresholdRuleFilter.run();
+    List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
+    Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
+    Assert.assertEquals(anomalies.size(), 3);
+    Assert.assertEquals(anomalies.get(0), this.anomalies.get(0));
+    Assert.assertEquals(anomalies.get(1), this.anomalies.get(1));
+    Assert.assertEquals(anomalies.get(2), this.anomalies.get(2));
+  }
+
   private static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
     MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(125L, start, end);
     anomaly.setMetricUrn(METRIC_URN);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org