You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/02/26 01:37:37 UTC
[incubator-pinot] branch master updated: [TE] detection -
threshold-based filter (#3881)
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 209c966 [TE] detection - threshold-based filter (#3881)
209c966 is described below
commit 209c9661c75ac340c94b54167e64b7f929a4e0b0
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Feb 25 17:37:33 2019 -0800
[TE] detection - threshold-based filter (#3881)
Threshold-based anomaly filter to work on the hourly/daily bucket.
---
.../components/ThresholdRuleAnomalyFilter.java | 32 ++++++++---
.../detection/spec/ThresholdRuleFilterSpec.java | 40 +++++++++----
.../components/ThresholdRuleAnomalyFilterTest.java | 66 ++++++++++++++++------
3 files changed, 101 insertions(+), 37 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
index b24ce1f..be7fae0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilter.java
@@ -19,6 +19,7 @@
package org.apache.pinot.thirdeye.detection.components;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
@@ -34,6 +35,7 @@ import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
import java.util.Collections;
import java.util.Map;
+import org.joda.time.Interval;
import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
@@ -42,12 +44,12 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
* This threshold rule filter stage filters the anomalies if either the min or max thresholds do not pass.
*/
@Components(title = "Aggregate Threshold Filter", type = "THRESHOLD_RULE_FILTER", tags = {
- DetectionTag.RULE_FILTER}, description = "Threshold rule filter. filters the anomalies if either the min or max thresholds do not satisfied.", presentation = {
- @PresentationOption(name = "absolute value", description = "aggregated absolute value within a time period", template = "is higher than ${min} and lower than ${max}")}, params = {
- @Param(name = "min", placeholder = "value"), @Param(name = "max", placeholder = "value")})
+ DetectionTag.RULE_FILTER}, description = "Threshold rule filter. filters the anomalies if either the min or max thresholds do not satisfied.")
public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFilterSpec> {
- private double min;
- private double max;
+ private double minValueHourly;
+ private double maxValueHourly;
+ private double minValueDaily;
+ private double maxValueDaily;
private InputDataFetcher dataFetcher;
@Override
@@ -58,10 +60,20 @@ public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFi
Map<MetricSlice, DataFrame> aggregates = data.getAggregates();
double currentValue = getValueFromAggregates(currentSlice, aggregates);
- if (!Double.isNaN(this.min) && currentValue < this.min) {
+
+ Interval anomalyInterval = new Interval(anomaly.getStartTime(), anomaly.getEndTime());
+ double hourlyMultiplier = TimeUnit.HOURS.toMillis(1) / (double) anomalyInterval.toDurationMillis();
+ double dailyMultiplier = TimeUnit.DAYS.toMillis(1) / (double) anomalyInterval.toDurationMillis();
+ if (!Double.isNaN(this.minValueHourly) && currentValue * hourlyMultiplier < this.minValueHourly) {
+ return false;
+ }
+ if (!Double.isNaN(this.maxValueHourly) && currentValue * hourlyMultiplier > this.maxValueHourly) {
+ return false;
+ }
+ if (!Double.isNaN(this.minValueDaily) && currentValue * dailyMultiplier< this.minValueDaily) {
return false;
}
- if (!Double.isNaN(this.max) && currentValue > this.max) {
+ if (!Double.isNaN(this.maxValueDaily) && currentValue * dailyMultiplier > this.maxValueDaily) {
return false;
}
return true;
@@ -69,8 +81,10 @@ public class ThresholdRuleAnomalyFilter implements AnomalyFilter<ThresholdRuleFi
@Override
public void init(ThresholdRuleFilterSpec spec, InputDataFetcher dataFetcher) {
- this.min = spec.getMin();
- this.max = spec.getMax();
+ this.minValueHourly = spec.getMinValueHourly();
+ this.maxValueHourly = spec.getMaxValueHourly();
+ this.minValueDaily = spec.getMinValueDaily();
+ this.maxValueDaily = spec.getMaxValueDaily();
this.dataFetcher = dataFetcher;
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java
index 72f4655..2c97df5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spec/ThresholdRuleFilterSpec.java
@@ -24,22 +24,42 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
public class ThresholdRuleFilterSpec extends AbstractSpec {
- private double min = Double.NaN;
- private double max = Double.NaN;
+ private double minValueHourly = Double.NaN;
+ private double minValueDaily = Double.NaN;
- public double getMin() {
- return min;
+ private double maxValueHourly = Double.NaN;
+ private double maxValueDaily = Double.NaN;
+
+
+ public double getMinValueHourly() {
+ return minValueHourly;
+ }
+
+ public void setMinValueHourly(double minValueHourly) {
+ this.minValueHourly = minValueHourly;
+ }
+
+ public double getMinValueDaily() {
+ return minValueDaily;
+ }
+
+ public void setMinValueDaily(double minValueDaily) {
+ this.minValueDaily = minValueDaily;
+ }
+
+ public double getMaxValueHourly() {
+ return maxValueHourly;
}
- public void setMin(double min) {
- this.min = min;
+ public void setMaxValueHourly(double maxValueHourly) {
+ this.maxValueHourly = maxValueHourly;
}
- public double getMax() {
- return max;
+ public double getMaxValueDaily() {
+ return maxValueDaily;
}
- public void setMax(double max) {
- this.max = max;
+ public void setMaxValueDaily(double maxValueDaily) {
+ this.maxValueDaily = maxValueDaily;
}
}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
index de52d00..84ae901 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
@@ -24,8 +24,10 @@ import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DefaultInputDataFetcher;
import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
import org.apache.pinot.thirdeye.detection.DetectionTestUtils;
+import org.apache.pinot.thirdeye.detection.InputDataFetcher;
import org.apache.pinot.thirdeye.detection.MockDataProvider;
import org.apache.pinot.thirdeye.detection.MockPipeline;
import org.apache.pinot.thirdeye.detection.MockPipelineLoader;
@@ -60,13 +62,13 @@ public class ThresholdRuleAnomalyFilterTest {
@BeforeMethod
public void beforeMethod() {
Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
- aggregates.put(MetricSlice.from(123L, 0, 2),
+ aggregates.put(MetricSlice.from(123L, 1551186000000L, 1551189600000L),
new DataFrame().addSeries(COL_VALUE, 0));
- aggregates.put(MetricSlice.from(123L, 4, 6),
+ aggregates.put(MetricSlice.from(123L, 1551189600000L, 1551193200000L),
new DataFrame().addSeries(COL_VALUE, 200));
- aggregates.put(MetricSlice.from(123L, 6, 8),
+ aggregates.put(MetricSlice.from(123L, 1551193200000L, 1551196800000L),
new DataFrame().addSeries(COL_VALUE, 500));
- aggregates.put(MetricSlice.from(123L, 8, 10),
+ aggregates.put(MetricSlice.from(123L, 1551196800000L, 1551200400000L),
new DataFrame().addSeries(COL_VALUE, 1000));
MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
@@ -92,12 +94,12 @@ public class ThresholdRuleAnomalyFilterTest {
this.config.setComponentSpecs(ImmutableMap.of("abc", this.specs));
this.config.setProperties(this.properties);
- this.anomalies = Arrays.asList(makeAnomaly(0, 2), makeAnomaly(4, 6), makeAnomaly(6, 8), makeAnomaly(8, 10));
+ this.anomalies = Arrays.asList(makeAnomaly(1551186000000L, 1551189600000L), makeAnomaly(1551189600000L, 1551193200000L), makeAnomaly(1551193200000L, 1551196800000L), makeAnomaly(1551196800000L, 1551200400000L));
this.runs = new ArrayList<>();
this.loader = new MockPipelineLoader(this.runs, Collections.singletonList(
- new MockPipelineOutput(this.anomalies, 10)));
+ new MockPipelineOutput(this.anomalies, 1551200400000L)));
this.testDataProvider = new MockDataProvider()
.setLoader(this.loader)
@@ -108,11 +110,11 @@ public class ThresholdRuleAnomalyFilterTest {
@Test(priority = 0)
public void testThresholdRuleFilterNone() throws Exception {
- this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+ this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
DetectionPipelineResult result = this.thresholdRuleFilter.run();
List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
- Assert.assertEquals(result.getLastTimestamp(), 10);
+ Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
Assert.assertEquals(anomalies.size(), 4);
Assert.assertEquals(anomalies.get(0), this.anomalies.get(0));
Assert.assertEquals(anomalies.get(1), this.anomalies.get(1));
@@ -122,12 +124,12 @@ public class ThresholdRuleAnomalyFilterTest {
@Test(priority = 1)
public void testThresholdRuleFilterMin() throws Exception {
- this.specs.put("min", 200);
- this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+ this.specs.put("minValueHourly", 200);
+ this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
DetectionPipelineResult result = this.thresholdRuleFilter.run();
List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
- Assert.assertEquals(result.getLastTimestamp(), 10);
+ Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
Assert.assertEquals(anomalies.size(), 3);
Assert.assertEquals(anomalies.get(0), this.anomalies.get(1));
Assert.assertEquals(anomalies.get(1), this.anomalies.get(2));
@@ -136,12 +138,12 @@ public class ThresholdRuleAnomalyFilterTest {
@Test(priority = 2)
public void testThresholdRuleFilterMax() throws Exception {
- this.specs.put("max", 500);
- this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+ this.specs.put("maxValueHourly", 500);
+ this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
DetectionPipelineResult result = this.thresholdRuleFilter.run();
List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
- Assert.assertEquals(result.getLastTimestamp(), 10);
+ Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
Assert.assertEquals(anomalies.size(), 3);
Assert.assertEquals(anomalies.get(0), this.anomalies.get(0));
Assert.assertEquals(anomalies.get(1), this.anomalies.get(1));
@@ -150,18 +152,46 @@ public class ThresholdRuleAnomalyFilterTest {
@Test(priority = 3)
public void testThresholdRuleFilterBoth() throws Exception {
- this.specs.put("min", 200);
- this.specs.put("max", 500);
- this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 0, 10);
+ this.specs.put("minValueHourly", 200);
+ this.specs.put("maxValueHourly", 500);
+ this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
DetectionPipelineResult result = this.thresholdRuleFilter.run();
List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
- Assert.assertEquals(result.getLastTimestamp(), 10);
+ Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
Assert.assertEquals(anomalies.size(), 2);
Assert.assertEquals(anomalies.get(0), this.anomalies.get(1));
Assert.assertEquals(anomalies.get(1), this.anomalies.get(2));
}
+ @Test(priority = 4)
+ public void testThresholdRuleFilterMinDaily() throws Exception {
+ this.specs.put("minValueDaily", 2400);
+ this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+
+ DetectionPipelineResult result = this.thresholdRuleFilter.run();
+ List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
+ Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
+ Assert.assertEquals(anomalies.size(), 3);
+ Assert.assertEquals(anomalies.get(0), this.anomalies.get(1));
+ Assert.assertEquals(anomalies.get(1), this.anomalies.get(2));
+ Assert.assertEquals(anomalies.get(2), this.anomalies.get(3));
+ }
+
+ @Test(priority = 5)
+ public void testThresholdRuleFilterMaxDaily() throws Exception {
+ this.specs.put("maxValueDaily", 12000);
+ this.thresholdRuleFilter = new AnomalyFilterWrapper(this.testDataProvider, this.config, 1551186000000L, 1551189600000L);
+
+ DetectionPipelineResult result = this.thresholdRuleFilter.run();
+ List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
+ Assert.assertEquals(result.getLastTimestamp(), 1551200400000L);
+ Assert.assertEquals(anomalies.size(), 3);
+ Assert.assertEquals(anomalies.get(0), this.anomalies.get(0));
+ Assert.assertEquals(anomalies.get(1), this.anomalies.get(1));
+ Assert.assertEquals(anomalies.get(2), this.anomalies.get(2));
+ }
+
private static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(125L, start, end);
anomaly.setMetricUrn(METRIC_URN);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org