You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ak...@apache.org on 2020/08/25 18:49:34 UTC
[incubator-pinot] branch master updated: [TE] Remove deprecated
classes under /detection/algorithm (#5908)
This is an automated email from the ASF dual-hosted git repository.
akshayrai09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1b3fb99 [TE] Remove deprecated classes under /detection/algorithm (#5908)
1b3fb99 is described below
commit 1b3fb999395923561cae7165b7a491e061b096d2
Author: Akshay Rai <ak...@linkedin.com>
AuthorDate: Tue Aug 25 11:49:25 2020 -0700
[TE] Remove deprecated classes under /detection/algorithm (#5908)
---
.../thirdeye/detection/DefaultDataProvider.java | 2 +-
.../detection/DetectionPipelineLoader.java | 2 +-
.../pinot/thirdeye/detection/DetectionUtils.java | 4 +-
.../detection/algorithm/BaselineAlgorithm.java | 139 ----
.../algorithm/BaselineRuleFilterWrapper.java | 117 ----
.../detection/algorithm/MovingWindowAlgorithm.java | 762 ---------------------
.../algorithm/RuleBasedFilterWrapper.java | 91 ---
.../detection/algorithm/ThresholdAlgorithm.java | 93 ---
.../algorithm/ThresholdRuleFilterWrapper.java | 73 --
.../components/AbsoluteChangeRuleDetector.java | 2 +-
.../detection/components/HoltWintersDetector.java | 2 +-
.../components/MeanVarianceRuleDetector.java | 1 -
.../components/PercentageChangeRuleDetector.java | 2 +-
.../components/ThresholdRuleDetector.java | 2 +-
.../detection/algorithm/BaselineAlgorithmTest.java | 139 ----
.../algorithm/MovingWindowAlgorithmTest.java | 414 -----------
.../algorithm/ThresholdAlgorithmTest.java | 89 ---
.../MergeDimensionThresholdIntegrationTest.java | 158 -----
.../mergeDimensionThresholdProperties.json | 31 -
19 files changed, 8 insertions(+), 2115 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
index 4d82342..c996196 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
@@ -215,7 +215,7 @@ public class DefaultDataProvider implements DataProvider {
}
@Override
- public DetectionPipeline loadPipeline(DetectionConfigDTO config, long start, long end) throws Exception {
+ public DetectionPipeline loadPipeline(DetectionConfigDTO config, long start, long end) {
return this.loader.from(this, config, start, end);
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineLoader.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineLoader.java
index dff23c8..7af6681 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineLoader.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineLoader.java
@@ -32,7 +32,7 @@ public class DetectionPipelineLoader {
Constructor<?> constructor = Class.forName(className).getConstructor(DataProvider.class, DetectionConfigDTO.class, long.class, long.class);
return (DetectionPipeline) constructor.newInstance(provider, config, start, end);
} catch (Exception e) {
- throw new IllegalArgumentException("Failed to initialize the detection pipeline.", e.getCause());
+ throw new IllegalArgumentException("Failed to initialize the detection pipeline.", e);
}
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
index f0eeb2f..9322bb5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
@@ -106,12 +106,12 @@ public class DetectionUtils {
* @param slice metric slice
* @param df time series with COL_TIME and at least one boolean value series
* @param seriesName name of the value series
- * @param endTime end time of this detection window
* @param monitoringGranularityPeriod the monitoring granularity period
* @param dataset dataset config for the metric
* @return list of anomalies
*/
- public static List<MergedAnomalyResultDTO> makeAnomalies(MetricSlice slice, DataFrame df, String seriesName, long endTime, Period monitoringGranularityPeriod, DatasetConfigDTO dataset) {
+ public static List<MergedAnomalyResultDTO> makeAnomalies(MetricSlice slice, DataFrame df, String seriesName,
+ Period monitoringGranularityPeriod, DatasetConfigDTO dataset) {
if (df.isEmpty()) {
return Collections.emptyList();
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineAlgorithm.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineAlgorithm.java
deleted file mode 100644
index 3d4b0d0..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineAlgorithm.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.algorithm;
-
-import com.google.common.base.Preconditions;
-import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
-import org.apache.pinot.thirdeye.detection.StaticDetectionPipeline;
-import org.apache.pinot.thirdeye.detection.spi.model.InputData;
-import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-import org.apache.pinot.thirdeye.rootcause.timeseries.Baseline;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregate;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregateType;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.commons.collections4.MapUtils;
-import org.joda.time.DateTimeZone;
-
-import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-/**
- * Simple baseline algorithm. Computes a multi-week aggregate baseline and compares
- * the current value based on relative change or absolute difference.
- */
-public class BaselineAlgorithm extends StaticDetectionPipeline {
- private static final String COL_CURR = "current";
- private static final String COL_BASE = "baseline";
- private static final String COL_DIFF = "diff";
- private static final String COL_DIFF_VIOLATION = "diff_violation";
- private static final String COL_CHANGE = "change";
- private static final String COL_CHANGE_VIOLATION = "change_violation";
- private static final String COL_ANOMALY = "anomaly";
-
- private static final String PROP_METRIC_URN = "metricUrn";
-
- private static final String PROP_AGGREGATION = "aggregation";
- private static final String PROP_AGGREGATION_DEFAULT = "MEDIAN";
-
- private static final String PROP_WEEKS = "weeks";
- private static final int PROP_WEEKS_DEFAULT = 1;
-
- private static final String PROP_CHANGE = "change";
- private static final double PROP_CHANGE_DEFAULT = Double.NaN;
-
- private static final String PROP_DIFFERENCE = "difference";
- private static final double PROP_DIFFERENCE_DEFAULT = Double.NaN;
-
- private static final String PROP_TIMEZONE = "timezone";
- private static final String PROP_TIMEZONE_DEFAULT = "UTC";
-
- private MetricSlice slice;
- private Baseline baseline;
- private double change;
- private double difference;
-
- public BaselineAlgorithm(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
- super(provider, config, startTime, endTime);
-
- Preconditions.checkArgument(config.getProperties().containsKey(PROP_METRIC_URN));
-
- String metricUrn = MapUtils.getString(config.getProperties(), PROP_METRIC_URN);
- MetricEntity me = MetricEntity.fromURN(metricUrn);
- this.slice = MetricSlice.from(me.getId(), this.startTime, this.endTime, me.getFilters());
-
- int weeks = MapUtils.getIntValue(config.getProperties(), PROP_WEEKS, PROP_WEEKS_DEFAULT);
- BaselineAggregateType aggregation = BaselineAggregateType.valueOf(MapUtils.getString(config.getProperties(), PROP_AGGREGATION, PROP_AGGREGATION_DEFAULT));
- DateTimeZone timezone = DateTimeZone.forID(MapUtils.getString(this.config.getProperties(), PROP_TIMEZONE, PROP_TIMEZONE_DEFAULT));
- this.baseline = BaselineAggregate.fromWeekOverWeek(aggregation, weeks, 1, timezone);
-
- this.change = MapUtils.getDoubleValue(config.getProperties(), PROP_CHANGE, PROP_CHANGE_DEFAULT);
- this.difference = MapUtils.getDoubleValue(config.getProperties(), PROP_DIFFERENCE, PROP_DIFFERENCE_DEFAULT);
- }
-
- @Override
- public InputDataSpec getInputDataSpec() {
- List<MetricSlice> slices = new ArrayList<>(this.baseline.scatter(this.slice));
- slices.add(this.slice);
-
- return new InputDataSpec()
- .withTimeseriesSlices(slices);
- }
-
- @Override
- public DetectionPipelineResult run(InputData data) {
- DataFrame dfCurr = data.getTimeseries().get(this.slice).renameSeries(COL_VALUE, COL_CURR);
- DataFrame dfBase = this.baseline.gather(this.slice, data.getTimeseries()).renameSeries(COL_VALUE, COL_BASE);
-
- DataFrame df = new DataFrame(dfCurr).addSeries(dfBase);
- df.addSeries(COL_DIFF, df.getDoubles(COL_CURR).subtract(df.get(COL_BASE)));
- df.addSeries(COL_CHANGE, df.getDoubles(COL_CURR).divide(df.get(COL_BASE)).subtract(1));
-
- // defaults
- df.addSeries(COL_CHANGE_VIOLATION, BooleanSeries.fillValues(df.size(), false));
- df.addSeries(COL_DIFF_VIOLATION, BooleanSeries.fillValues(df.size(), false));
-
- // relative change
- if (!Double.isNaN(this.change)) {
- df.addSeries(COL_CHANGE_VIOLATION, df.getDoubles(COL_CHANGE).abs().gte(this.change));
- }
-
- // absolute difference
- if (!Double.isNaN(this.difference)) {
- df.addSeries(COL_DIFF_VIOLATION, df.getDoubles(COL_DIFF).abs().gte(this.difference));
- }
-
- // anomalies
- df.mapInPlace(BooleanSeries.HAS_TRUE, COL_ANOMALY, COL_CHANGE_VIOLATION, COL_DIFF_VIOLATION);
-
- List<MergedAnomalyResultDTO> anomalies = this.makeAnomalies(this.slice, df, COL_ANOMALY);
-
- return new DetectionPipelineResult(anomalies)
- .setDiagnostics(Collections.singletonMap(DetectionPipelineResult.DIAGNOSTICS_DATA, (Object) df.dropAllNullColumns()));
- }
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java
deleted file mode 100644
index ea43d58..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.algorithm;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ArrayListMultimap;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-import org.apache.pinot.thirdeye.rootcause.timeseries.Baseline;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregate;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregateType;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.commons.collections4.MapUtils;
-import org.joda.time.DateTimeZone;
-
-import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-/**
- * This filter wrapper filters the anomalies if either the absolute change, percentage change or site wide impact does not pass the threshold.
- */
-public class BaselineRuleFilterWrapper extends RuleBasedFilterWrapper {
- private static final String PROP_WEEKS = "weeks";
- private static final int PROP_WEEKS_DEFAULT = 1;
-
- private static final String PROP_CHANGE = "change";
- private static final double PROP_CHANGE_DEFAULT = Double.NaN;
-
- private static final String PROP_DIFFERENCE = "difference";
- private static final double PROP_DIFFERENCE_DEFAULT = Double.NaN;
-
- private static final String PROP_TIMEZONE = "timezone";
- private static final String PROP_TIMEZONE_DEFAULT = "UTC";
-
- private static final String PROP_SITEWIDE_METRIC = "siteWideMetricUrn";
- private static final String PROP_SITEWIDE_THRESHOLD = "siteWideImpactThreshold";
- private static final double PROP_SITEWIDE_THRESHOLD_DEFAULT = Double.NaN;
-
- private Baseline baseline;
- private double change;
- private double difference;
- private double siteWideImpactThreshold;
- private String siteWideMetricUrn;
-
- public BaselineRuleFilterWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
- super(provider, config, startTime, endTime);
- int weeks = MapUtils.getIntValue(config.getProperties(), PROP_WEEKS, PROP_WEEKS_DEFAULT);
- DateTimeZone timezone =
- DateTimeZone.forID(MapUtils.getString(this.config.getProperties(), PROP_TIMEZONE, PROP_TIMEZONE_DEFAULT));
- this.baseline = BaselineAggregate.fromWeekOverWeek(BaselineAggregateType.MEDIAN, weeks, 1, timezone);
- // percentage change
- this.change = MapUtils.getDoubleValue(config.getProperties(), PROP_CHANGE, PROP_CHANGE_DEFAULT);
- // absolute change
- this.difference = MapUtils.getDoubleValue(config.getProperties(), PROP_DIFFERENCE, PROP_DIFFERENCE_DEFAULT);
- // site wide impact
- this.siteWideImpactThreshold = MapUtils.getDoubleValue(config.getProperties(), PROP_SITEWIDE_THRESHOLD, PROP_SITEWIDE_THRESHOLD_DEFAULT);
- this.siteWideMetricUrn = MapUtils.getString(config.getProperties(), PROP_SITEWIDE_METRIC);
- }
-
- @Override
- boolean isQualified(MergedAnomalyResultDTO anomaly) {
- MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
- MetricSlice currentSlice =
- MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
- MetricSlice baselineSlice = this.baseline.scatter(currentSlice).get(0);
-
- Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Arrays.asList(currentSlice, baselineSlice), Collections.<String>emptyList(), -1);
- double currentValue = getValueFromAggregates(currentSlice, aggregates);
- double baselineValue = getValueFromAggregates(baselineSlice, aggregates);
- if (!Double.isNaN(this.difference) && Math.abs(currentValue - baselineValue) < this.difference) {
- return false;
- }
- if (!Double.isNaN(this.change) && baselineValue != 0 && Math.abs(currentValue / baselineValue - 1) < this.change) {
- return false;
- }
- if (!Double.isNaN(this.siteWideImpactThreshold)) {
- String siteWideImpactMetricUrn = Strings.isNullOrEmpty(this.siteWideMetricUrn) ? anomaly.getMetricUrn() : this.siteWideMetricUrn;
- MetricEntity siteWideEntity = MetricEntity.fromURN(siteWideImpactMetricUrn).withFilters(ArrayListMultimap.<String, String>create());
- MetricSlice siteWideSlice = this.baseline.scatter(
- MetricSlice.from(siteWideEntity.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters())).get(0);
- double siteWideBaselineValue = getValueFromAggregates(siteWideSlice,
- this.provider.fetchAggregates(Collections.singleton(siteWideSlice), Collections.<String>emptyList(), -1));
-
- if (siteWideBaselineValue != 0 && (Math.abs(currentValue - baselineValue) / siteWideBaselineValue) < this.siteWideImpactThreshold) {
- return false;
- }
- }
- return true;
- }
-
- double getValueFromAggregates(MetricSlice slice, Map<MetricSlice, DataFrame> aggregates) {
- return aggregates.get(slice).getDouble(COL_VALUE, 0);
- }
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MovingWindowAlgorithm.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MovingWindowAlgorithm.java
deleted file mode 100644
index 94e84fb..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MovingWindowAlgorithm.java
+++ /dev/null
@@ -1,762 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.algorithm;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
-import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
-import org.apache.pinot.thirdeye.dataframe.LongSeries;
-import org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
-import org.apache.pinot.thirdeye.detection.ConfigUtils;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
-import org.apache.pinot.thirdeye.detection.StaticDetectionPipeline;
-import org.apache.pinot.thirdeye.detection.spi.model.InputData;
-import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-import org.apache.pinot.thirdeye.rootcause.timeseries.Baseline;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregate;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregateType;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import org.apache.commons.collections4.MapUtils;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.DurationFieldType;
-import org.joda.time.Period;
-
-
-/**
- * Exponential smoothing for baseline generation. Detects anomalies via normalized
- * zscore or quantile rules from rolling look-back window. Supports basic noise
- * suppression, outlier elimination and change point detection.
- */
-public class MovingWindowAlgorithm extends StaticDetectionPipeline {
- private static final String COL_CURR = "currentValue";
- private static final String COL_BASE = "baselineValue";
- private static final String COL_STD = "std";
- private static final String COL_MEAN = "mean";
- private static final String COL_QUANTILE_MIN = "quantileMin";
- private static final String COL_QUANTILE_MAX = "quantileMax";
- private static final String COL_ZSCORE = "zscore";
- private static final String COL_VIOLATION = "violation";
- private static final String COL_ANOMALY = "anomaly";
- private static final String COL_NOT_ANOMALY = "notAnomaly";
- private static final String COL_OUTLIER = "outlier";
- private static final String COL_TIME = DataFrameUtils.COL_TIME;
- private static final String COL_VALUE = DataFrameUtils.COL_VALUE;
- private static final String COL_COMPUTED_VALUE = "computedValue";
- private static final String COL_WINDOW_SIZE = "windowSize";
- private static final String COL_BASELINE = "baseline";
-
- private static final String COL_WEIGHT = "weight";
- private static final String COL_WEIGHTED_VALUE = "weightedValue";
- private static final String COL_WEIGHTED_MEAN = "weightedMena";
- private static final String COL_WEIGHTED_STD = "weightedStd";
-
- private static final String PROP_METRIC_URN = "metricUrn";
-
- private final MetricSlice sliceData;
- private final MetricSlice sliceDetection;
- private final AnomalySlice anomalySlice;
-
- private final Period windowSize;
- private final Period lookbackPeriod;
- private final Period reworkPeriod;
- private final double zscoreMin;
- private final double zscoreMax;
- private final double zscoreOutlier;
- private final int kernelSize;
- private final double quantileMin;
- private final double quantileMax;
- private final DateTimeZone timezone;
- private final Period changeDuration;
- private final double changeFraction;
- private final int baselineWeeks;
- private final boolean applyLog;
- private final double learningRate;
-
- private final long effectiveStartTime;
-
- public MovingWindowAlgorithm(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
- super(provider, config, startTime, endTime);
-
- Preconditions.checkArgument(config.getProperties().containsKey(PROP_METRIC_URN));
-
- String metricUrn = MapUtils.getString(config.getProperties(), PROP_METRIC_URN);
- MetricEntity me = MetricEntity.fromURN(metricUrn);
-
- this.quantileMin = MapUtils.getDoubleValue(config.getProperties(), "quantileMin", Double.NaN);
- this.quantileMax = MapUtils.getDoubleValue(config.getProperties(), "quantileMax", Double.NaN);
- this.zscoreMin = MapUtils.getDoubleValue(config.getProperties(), "zscoreMin", Double.NaN);
- this.zscoreMax = MapUtils.getDoubleValue(config.getProperties(), "zscoreMax", Double.NaN);
- this.zscoreOutlier = MapUtils.getDoubleValue(config.getProperties(), "zscoreOutlier", Double.NaN);
- this.kernelSize = MapUtils.getIntValue(config.getProperties(), "kernelSize", 1);
- this.timezone = DateTimeZone.forID(MapUtils.getString(config.getProperties(), "timezone", "UTC"));
- this.windowSize = ConfigUtils.parsePeriod(MapUtils.getString(config.getProperties(), "windowSize", "1week"));
- this.lookbackPeriod = ConfigUtils.parsePeriod(MapUtils.getString(config.getProperties(), "lookbackPeriod", "1week"));
- this.reworkPeriod = ConfigUtils.parsePeriod(MapUtils.getString(config.getProperties(), "reworkPeriod", "1day"));
- this.changeDuration = ConfigUtils.parsePeriod(MapUtils.getString(config.getProperties(), "changeDuration", "5days"));
- this.changeFraction = MapUtils.getDoubleValue(config.getProperties(), "changeFraction", 0.666);
- this.baselineWeeks = MapUtils.getIntValue(config.getProperties(), "baselineWeeks", 0);
- this.applyLog = MapUtils.getBooleanValue(config.getProperties(), "applyLog", true);
- this.learningRate = MapUtils.getDoubleValue(config.getProperties(), "learningRate", 0.666);
-
- Preconditions.checkArgument(Double.isNaN(this.quantileMin) || (this.quantileMin >= 0 && this.quantileMin <= 1.0), "quantileMin must be between 0.0 and 1.0");
- Preconditions.checkArgument(Double.isNaN(this.quantileMax) || (this.quantileMax >= 0 && this.quantileMax <= 1.0), "quantileMax must be between 0.0 and 1.0");
-
- this.effectiveStartTime = new DateTime(startTime, this.timezone).minus(this.lookbackPeriod).getMillis();
-
- DateTime trainStart = new DateTime(this.effectiveStartTime, this.timezone).minus(this.windowSize);
- DateTime dataStart = trainStart.minus(new Period().withField(DurationFieldType.weeks(), baselineWeeks));
- DateTime detectionStart = new DateTime(startTime, this.timezone).minus(this.reworkPeriod);
-
- this.sliceData = MetricSlice.from(me.getId(), dataStart.getMillis(), endTime, me.getFilters());
- this.sliceDetection = MetricSlice.from(me.getId(), detectionStart.getMillis(), endTime, me.getFilters());
-
- AnomalySlice slice = new AnomalySlice().withStart(this.sliceData.getStart()).withEnd(this.sliceData.getEnd());
- if (this.config.getId() != null) {
- this.anomalySlice = slice.withDetectionId(this.config.getId());
- } else {
- this.anomalySlice = slice;
- }
- }
-
- @Override
- public InputDataSpec getInputDataSpec() {
- InputDataSpec dataSpec = new InputDataSpec()
- .withTimeseriesSlices(Collections.singleton(this.sliceData));
-
- if (this.config.getId() != null) {
- dataSpec = dataSpec.withAnomalySlices(Collections.singleton(this.anomalySlice));
- }
-
- return dataSpec;
- }
-
- @Override
- public DetectionPipelineResult run(InputData data) throws Exception {
- DataFrame dfInput = data.getTimeseries().get(this.sliceData);
-
- // log transform
- if (this.applyLog) {
- if (dfInput.getDoubles(COL_VALUE).lt(0).hasTrue()) {
- throw new IllegalArgumentException("Cannot apply log to series with negative values");
- }
- dfInput.addSeries(COL_VALUE, dfInput.getDoubles(COL_VALUE).add(1).log());
- }
-
- // kernel smoothing
- if (this.kernelSize > 1) {
- final int kernelOffset = this.kernelSize / 2;
- double[] values = dfInput.getDoubles(COL_VALUE).values();
- for (int i = 0; i < values.length - this.kernelSize + 1; i++) {
- values[i + kernelOffset] = AlgorithmUtils.robustMean(dfInput.getDoubles(COL_VALUE).slice(i, i + this.kernelSize), this.kernelSize).getDouble(this.kernelSize - 1);
- }
- dfInput.addSeries(COL_VALUE, values);
- }
-
- Collection<MergedAnomalyResultDTO> existingAnomalies = data.getAnomalies().get(this.anomalySlice);
-
- // pre-detection change points
- TreeSet<Long> changePoints = getChangePoints(dfInput, this.effectiveStartTime, existingAnomalies);
-
- // write-through arrays
- dfInput.addSeries(COL_BASELINE, DoubleSeries.nulls(dfInput.size()));
- dfInput.addSeries(COL_MEAN, DoubleSeries.nulls(dfInput.size()));
- dfInput.addSeries(COL_STD, DoubleSeries.nulls(dfInput.size()));
- dfInput.addSeries(COL_ZSCORE, DoubleSeries.nulls(dfInput.size()));
- dfInput.addSeries(COL_QUANTILE_MIN, DoubleSeries.nulls(dfInput.size()));
- dfInput.addSeries(COL_QUANTILE_MAX, DoubleSeries.nulls(dfInput.size()));
- dfInput.addSeries(COL_WINDOW_SIZE, LongSeries.nulls(dfInput.size()));
- dfInput.addSeries(COL_COMPUTED_VALUE, DoubleSeries.nulls(dfInput.size()));
- dfInput.addSeries(COL_OUTLIER, BooleanSeries.fillValues(dfInput.size(), false));
-
- // populate pre-existing anomalies (but don't mark as outliers yet)
- dfInput = applyExistingAnomalies(dfInput, existingAnomalies);
-
- // populate pre-computed values
- long[] sTimestamp = dfInput.getLongs(COL_TIME).values();
- double[] sComputed = dfInput.getDoubles(COL_COMPUTED_VALUE).values();
- for (int i = 0; i < sTimestamp.length && sTimestamp[i] < this.effectiveStartTime; i++) {
- double baseline = 0;
- if (this.baselineWeeks > 0) {
- baseline = this.makeBaseline(dfInput, sTimestamp[i], changePoints);
- }
- sComputed[i] = dfInput.getDouble(COL_VALUE, i) - baseline;
- }
-
- // estimate outliers (anomaly and non-anomaly outliers)
- // https://en.m.wikipedia.org/wiki/Median_absolute_deviation
- if (!Double.isNaN(this.zscoreOutlier)) {
- DataFrame dfPrefix = dfInput.filter(dfInput.getLongs(COL_TIME).lt(this.effectiveStartTime)).dropNull(COL_TIME, COL_COMPUTED_VALUE);
- DoubleSeries prefix = dfPrefix.getDoubles(COL_COMPUTED_VALUE);
- DoubleSeries mad = prefix.subtract(prefix.median()).abs().median();
- if (!mad.isNull(0) && mad.getDouble(0) > 0.0) {
- double std = 1.4826 * mad.doubleValue();
- double mean = AlgorithmUtils.robustMean(prefix, prefix.size()).getDouble(prefix.size() - 1);
- DoubleSeries zscoreAbs = prefix.subtract(mean).divide(std).abs();
- BooleanSeries fullOutlier = zscoreAbs.gt(this.zscoreOutlier);
- BooleanSeries partialOutlier = zscoreAbs.gt(this.zscoreOutlier / 2);
-
- dfPrefix.addSeries(COL_OUTLIER, expandViolation(fullOutlier, partialOutlier));
- dfInput.addSeries(dfPrefix, COL_OUTLIER);
- dfInput = dfInput.fillNull(COL_OUTLIER);
- }
- }
-
- // finally apply existing anomalies as outliers
- dfInput.addSeries(COL_OUTLIER, dfInput.getBooleans(COL_OUTLIER)
- .or(dfInput.getBooleans(COL_ANOMALY))
- .and(dfInput.getBooleans(COL_NOT_ANOMALY).not()));
-
- // generate detection time series
- Result result = this.run(dfInput, this.effectiveStartTime, changePoints);
-
- List<MergedAnomalyResultDTO> anomalies = this.makeAnomalies(this.sliceDetection, result.data, COL_ANOMALY);
-
- Map<String, Object> diagnostics = new HashMap<>();
- diagnostics.put(DetectionPipelineResult.DIAGNOSTICS_DATA, result.data.dropAllNullColumns());
- diagnostics.put(DetectionPipelineResult.DIAGNOSTICS_CHANGE_POINTS, result.changePoints);
-
- return new DetectionPipelineResult(anomalies)
- .setDiagnostics(diagnostics);
- }
-
- /**
- * Run anomaly detection from a given start timestamp
- *
- * @param df raw input data
- * @param start start time stamp
- * @param changePoints set of change points
- * @return detection result
- * @throws Exception
- */
- private Result run(DataFrame df, long start, TreeSet<Long> changePoints) throws Exception {
-
- // write-through arrays
- double[] sBaseline = df.getDoubles(COL_BASELINE).values();
- double[] sMean = df.getDoubles(COL_MEAN).values();
- double[] sStd = df.getDoubles(COL_STD).values();
- double[] sZscore = df.getDoubles(COL_ZSCORE).values();
- double[] sQuantileMin = df.getDoubles(COL_QUANTILE_MIN).values();
- double[] sQuantileMax = df.getDoubles(COL_QUANTILE_MAX).values();
- double[] sComputed = df.getDoubles(COL_COMPUTED_VALUE).values();
- byte[] sAnomaly = df.getBooleans(COL_ANOMALY).values();
- byte[] sOutlier = df.getBooleans(COL_OUTLIER).values();
- long[] sWindowSize = df.getLongs(COL_WINDOW_SIZE).values();
-
- // scan
- List<Long> timestamps = df.getLongs(COL_TIME).filter(df.getLongs(COL_TIME).between(start, this.endTime)).dropNull().toList();
- for (long timestamp : timestamps) {
-
- //
- // test for intra-detection change points
- //
- long fractionRangeStart = new DateTime(timestamp, this.timezone).minus(this.changeDuration).getMillis();
- DataFrame changePointWindow = df.filter(df.getLongs(COL_TIME).between(fractionRangeStart, timestamp)).dropNull(COL_TIME);
-
- Long latestChangePoint = changePoints.floor(timestamp);
- long minChangePoint = latestChangePoint == null ? fractionRangeStart : new DateTime(latestChangePoint, this.timezone).plus(this.changeDuration).getMillis();
-
- long fractionChangePoint = extractAnomalyFractionChangePoint(changePointWindow, this.changeFraction);
-
- // TODO prevent change point from anomalies labeled as outliers (but not new trend) by user
-
- if (fractionChangePoint >= 0 && fractionChangePoint >= minChangePoint) {
- TreeSet<Long> changePointsNew = new TreeSet<>(changePoints);
- changePointsNew.add(fractionChangePoint);
- return this.run(df, timestamp, changePointsNew);
- }
-
- // source index
- int index = df.getLongs(COL_TIME).find(timestamp);
-
- //
- // computed values
- //
- double value = df.getDouble(COL_VALUE, index);
-
- double baseline = 0;
- if (this.baselineWeeks > 0) {
- baseline = this.makeBaseline(df, timestamp, changePoints);
- }
- sBaseline[index] = baseline;
-
- double computed = value - baseline;
- sComputed[index] = computed;
-
- //
- // variable look back window
- //
- DataFrame window = this.makeWindow(df, timestamp, changePoints);
- sWindowSize[index] = window.size();
-
- if (window.size() <= 1) {
- continue;
- }
-
- //
- // quantiles
- //
- DoubleSeries computedValues = window.getDoubles(COL_COMPUTED_VALUE);
-
- if (!Double.isNaN(this.quantileMin) || !Double.isNaN(this.quantileMax)) {
- if (!Double.isNaN(this.quantileMin)) {
- sQuantileMin[index] = computedValues.quantile(this.quantileMin).doubleValue();
- sAnomaly[index] |= (computed < sQuantileMin[index] ? 1 : 0);
- }
-
- if (!Double.isNaN(this.quantileMax)) {
- sQuantileMax[index] = computedValues.quantile(this.quantileMax).doubleValue();
- sAnomaly[index] |= (computed > sQuantileMax[index] ? 1 : 0);
- }
- }
-
- //
- // zscore
- //
- if (!Double.isNaN(this.zscoreOutlier) || !Double.isNaN(this.zscoreMin) || !Double.isNaN(this.zscoreMax)) {
- DataFrame windowStats = this.makeWindowStats(window, timestamp);
-
- if (!windowStats.isEmpty()) {
- double mean = windowStats.getDouble(COL_MEAN, 0);
- double std = windowStats.getDouble(COL_STD, 0);
- double zscore = (computed - mean) / std;
-
- sMean[index] = mean;
- sStd[index] = std;
- sZscore[index] = zscore;
-
- // outlier elimination
- if (!Double.isNaN(this.zscoreOutlier) && Math.abs(zscore) > this.zscoreOutlier) {
- sOutlier[index] = 1;
- }
-
- BooleanSeries partialViolation = BooleanSeries.fillValues(df.size(), false);
-
- if (!Double.isNaN(this.zscoreMin) && zscore < this.zscoreMin) {
- sAnomaly[index] |= 1;
- partialViolation = partialViolation.or(df.getDoubles(COL_ZSCORE).lt(this.zscoreMin / 2).fillNull());
- }
-
- if (!Double.isNaN(this.zscoreMax) && zscore > this.zscoreMax) {
- sAnomaly[index] |= 1;
- partialViolation = partialViolation.or(df.getDoubles(COL_ZSCORE).gt(this.zscoreMax / 2).fillNull());
- }
-
- // anomaly region expansion
- if (partialViolation.hasTrue()) {
- partialViolation = partialViolation.or(df.getBooleans(COL_ANOMALY).fillNull());
- sAnomaly = anomalyRangeHelper(df, df.getBooleans(COL_ANOMALY), partialViolation).getBooleans(COL_ANOMALY).values();
- }
- }
- }
-
- //
- // house keeping
- //
-
- // mark anomalies as outliers (except regions marked as NOT_ANOMALY)
- sOutlier = df.mapInPlace(BooleanSeries.HAS_TRUE, COL_OUTLIER, COL_ANOMALY, COL_OUTLIER)
- .getBooleans(COL_OUTLIER).and(df.getBooleans(COL_NOT_ANOMALY).not()).values();
- }
-
- return new Result(df, changePoints);
- }
-
- /**
- * Helper for in-place insertion and expansion of anomaly ranges
- *
- * @param df data frame
- * @param violations boolean series of violations
- * @param partialViolations boolean series of partial violations for expansion
- * @return modified data frame
- */
- static DataFrame anomalyRangeHelper(DataFrame df, BooleanSeries violations, BooleanSeries partialViolations) {
- df.addSeries(COL_VIOLATION, expandViolation(violations, partialViolations).fillNull());
- df.mapInPlace(BooleanSeries.HAS_TRUE, COL_ANOMALY, COL_ANOMALY, COL_VIOLATION);
- df.dropSeries(COL_VIOLATION);
- return df;
- }
-
- /**
- * Expand violation ranges via the partial-violation threshold.
- *
- * @param violation boolean series of violations
- * @param partialViolation boolean series of partial violations
- * @return boolean series of expanded violations
- */
- static BooleanSeries expandViolation(BooleanSeries violation, BooleanSeries partialViolation) {
- if (violation.size() != partialViolation.size()) {
- throw new IllegalArgumentException("Series must be of equal size");
- }
-
- byte[] full = violation.values();
- byte[] partial = partialViolation.values();
- byte[] output = new byte[full.length];
-
- int lastPartial = -1;
- for (int i = 0; i < violation.size(); i++) {
- if (BooleanSeries.isFalse(partial[i])) {
- lastPartial = -1;
- }
-
- if (lastPartial < 0 && BooleanSeries.isTrue(partial[i])) {
- lastPartial = i;
- }
-
- if (full[i] > 0) {
- lastPartial = lastPartial >= 0 ? lastPartial : i;
-
- int j;
-
- for (j = lastPartial; j < full.length && !BooleanSeries.isFalse(partial[j]); j++) {
- if (BooleanSeries.isTrue(full[j]) || BooleanSeries.isTrue(partial[j])) {
- output[j] = 1;
- }
- }
-
- // move i to last checked candidate
- i = j - 1;
- }
- }
-
- return BooleanSeries.buildFrom(output);
- }
-
- /**
- * Find change points within window via anomaly fraction
- *
- * @param window data frame
- * @param fraction anomaly range fraction of total window
- * @return fraction of anomaly period compared to overall period
- */
- static long extractAnomalyFractionChangePoint(DataFrame window, double fraction) {
- long[] timestamp = window.getLongs(COL_TIME).values();
- byte[] anomaly = window.getBooleans(COL_ANOMALY).values();
-
- int max = window.get(COL_TIME).count();
- int count = 0;
- for (int i = window.size() - 1; i >= 0; i--) {
- if (!LongSeries.isNull(timestamp[i]) && !BooleanSeries.isNull(anomaly[i])) {
- count += BooleanSeries.isTrue(anomaly[i]) ? 1 : 0;
- }
-
- if (count / (double) max >= fraction) {
- return timestamp[i];
- }
- }
-
- return -1;
- }
-
- /**
- * Populates the anomaly series with {@code true} values for a given collection of anomalies.
- *
- * @param df data frame
- * @param anomalies pre-existing anomalies
- * @return anomaly populated data frame
- */
- DataFrame applyExistingAnomalies(DataFrame df, Collection<MergedAnomalyResultDTO> anomalies) {
- DataFrame res = new DataFrame(df)
- .addSeries(COL_ANOMALY, BooleanSeries.fillValues(df.size(), false))
- .addSeries(COL_NOT_ANOMALY, BooleanSeries.fillValues(df.size(), false));
-
- BooleanSeries trainingRange = res.getLongs(COL_TIME).lt(this.effectiveStartTime);
-
- for (MergedAnomalyResultDTO anomaly : anomalies) {
- // NOTE: NEW_TREND not labeled as anomaly
- // NOTE: NO_FEEDBACK not labeled as anomaly
-
- if (anomaly.getFeedback() != null && anomaly.getFeedback().getFeedbackType().isAnomaly()
- && !anomaly.getFeedback().getFeedbackType().equals(AnomalyFeedbackType.ANOMALY_NEW_TREND)) {
- res.set(COL_ANOMALY, res.getLongs(COL_TIME).between(anomaly.getStartTime(), anomaly.getEndTime()).and(trainingRange), BooleanSeries.fillValues(df.size(), true));
- }
-
- if (anomaly.getFeedback() != null && anomaly.getFeedback().getFeedbackType().isNotAnomaly()) {
- res.set(COL_NOT_ANOMALY, res.getLongs(COL_TIME).between(anomaly.getStartTime(), anomaly.getEndTime()).and(trainingRange), BooleanSeries.fillValues(df.size(), true));
- }
- }
-
- return res;
- }
-
- /**
- * Returns a dataframe with values differentiated vs a baseline. Prefers values after change-point if available
- *
- * @param df data (COL_TIME, COL_VALUE, COL_OUTLIER)
- * @param baseline baseline config
- * @param baseSlice base metric slice
- * @return data with differentiated values
- */
- static DataFrame diffTimeseries(DataFrame df, Baseline baseline, MetricSlice baseSlice) {
- Collection<MetricSlice> slices = baseline.scatter(baseSlice);
-
- Map<MetricSlice, DataFrame> map = new HashMap<>();
-
- for (MetricSlice slice : slices) {
- map.put(slice, sliceTimeseries(df, slice));
- }
-
- DataFrame dfCurr = new DataFrame(df).renameSeries(COL_VALUE, COL_CURR);
- DataFrame dfBase = baseline.gather(baseSlice, map).renameSeries(COL_VALUE, COL_BASE);
- DataFrame joined = new DataFrame(dfCurr).addSeries(dfBase, COL_BASE);
- joined.addSeries(COL_VALUE, joined.getDoubles(COL_CURR).subtract(joined.get(COL_BASE)));
-
- return joined;
- }
-
- /**
- * Returns a merged set of change points computed from time series and user-labeled anomalies.
- *
- * @param df data
- * @return set of change points
- */
- TreeSet<Long> getChangePoints(DataFrame df, long start, Collection<MergedAnomalyResultDTO> anomalies) {
- TreeSet<Long> changePoints = new TreeSet<>();
-
- // from time series
- if (this.changeDuration.toStandardDuration().getMillis() > 0) {
- // TODO configurable seasonality
- DataFrame dfChangePoint = new DataFrame(df).addSeries(COL_OUTLIER, BooleanSeries.fillValues(df.size(), false));
- Baseline baseline = BaselineAggregate.fromWeekOverWeek(BaselineAggregateType.SUM, 1, 1, this.timezone);
- DataFrame diffSeries = diffTimeseries(dfChangePoint, baseline, this.sliceData).dropNull(COL_TIME, COL_VALUE);
-
- // less than or equal to start only
- changePoints.addAll(AlgorithmUtils.getChangePointsRobustMean(diffSeries, this.kernelSize, this.changeDuration.toStandardDuration()).headSet(start, true));
- }
-
- // from anomalies
- Collection<MergedAnomalyResultDTO> changePointAnomalies = Collections2.filter(anomalies,
- new Predicate<MergedAnomalyResultDTO>() {
- @Override
- public boolean apply(MergedAnomalyResultDTO mergedAnomalyResultDTO) {
- return mergedAnomalyResultDTO != null
- && mergedAnomalyResultDTO.getFeedback() != null
- && AnomalyFeedbackType.ANOMALY_NEW_TREND.equals(mergedAnomalyResultDTO.getFeedback().getFeedbackType());
- }
- });
-
- for (MergedAnomalyResultDTO anomaly : changePointAnomalies) {
- changePoints.add(anomaly.getStartTime());
- }
-
- return changePoints;
- }
-
- /**
- * Helper slices base time series for given metric time slice
- *
- * @param df time series dataframe
- * @param slice metric slice
- * @return time series for given slice (range)
- */
- static DataFrame sliceTimeseries(DataFrame df, MetricSlice slice) {
- return df.filter(df.getLongs(COL_TIME).between(slice.getStart(), slice.getEnd())).dropNull(COL_TIME);
- }
-
- /**
- * Returns variable-size look back window for given timestamp.
- *
- * @param df data
- * @param tCurrent end timestamp (exclusive)
- * @param changePoints change points
- * @return window data frame
- */
- DataFrame makeWindow(DataFrame df, long tCurrent, TreeSet<Long> changePoints) {
- DateTime now = new DateTime(tCurrent);
- long tStart = now.minus(this.windowSize).getMillis();
-
- // truncate history at change point but leave at least a window equal to changeDuration
- Long changePoint = changePoints.lower(tCurrent);
- if (changePoint != null) {
- tStart = Math.max(tStart, changePoint);
- }
-
- // use non-outlier period, unless not enough history (anomalies are outliers too)
- BooleanSeries timeFilter = df.getLongs(COL_TIME).between(tStart, tCurrent);
- BooleanSeries outlierAndTimeFilter = df.getBooleans(COL_OUTLIER).not().and(timeFilter);
-
- // TODO make threshold for fallback to outlier period configurable
- if (outlierAndTimeFilter.sum().fillNull().longValue() <= timeFilter.sum().fillNull().longValue() / 3) {
- return df.filter(timeFilter).dropNull(COL_TIME, COL_COMPUTED_VALUE);
- }
-
- return df.filter(outlierAndTimeFilter).dropNull(COL_TIME, COL_COMPUTED_VALUE);
- }
-
- /**
- * Returns window stats (mean, std) using exponential smoothing for weekly stats.
- *
- * @param window time series window (truncated at change points)
- * @param tCurrent current time stamp
- * @return widnow stats with exp smoothing
- */
- // TODO use exp smoothing on raw values directly rather than week-over-week based. requires learning rate adjustment
- DataFrame makeWindowStats(DataFrame window, long tCurrent) {
- DateTime now = new DateTime(tCurrent, this.timezone);
- int windowSizeWeeks = this.windowSize.toStandardWeeks().getWeeks();
-
- // construct baseline
- DataFrame raw = new DataFrame(COL_TIME, LongSeries.nulls(windowSizeWeeks))
- .addSeries(COL_MEAN, DoubleSeries.nulls(windowSizeWeeks))
- .addSeries(COL_STD, DoubleSeries.nulls(windowSizeWeeks))
- .addSeries(COL_WEIGHT, DoubleSeries.nulls(windowSizeWeeks));
-
- long[] sTimestamp = raw.getLongs(COL_TIME).values();
- double[] sMean = raw.getDoubles(COL_MEAN).values();
- double[] sStd = raw.getDoubles(COL_STD).values();
- double[] sWeight = raw.getDoubles(COL_WEIGHT).values();
-
- for (int i = 0; i < windowSizeWeeks; i++) {
- int offset = windowSizeWeeks - i;
- long tFrom = now.minus(new Period().withWeeks(offset)).getMillis();
- long tTo = now.getMillis();
-
- DoubleSeries dfWeek = window.filter(window.getLongs(COL_TIME).between(tFrom, tTo))
- .dropNull(COL_TIME, COL_COMPUTED_VALUE)
- .getDoubles(COL_COMPUTED_VALUE);
-
- DoubleSeries mean = dfWeek.mean();
- DoubleSeries std = dfWeek.std();
-
- if (mean.hasNull() || std.hasNull()) {
- continue;
- }
-
- sTimestamp[i] = tFrom; // not used
- sMean[i] = mean.doubleValue();
- sStd[i] = Math.pow(std.doubleValue(), 2); // use variance for estimate
- sWeight[i] = Math.pow(this.learningRate, offset - 1);
- }
-
- DataFrame data = raw.dropNull();
- data.addSeries(COL_WEIGHTED_MEAN, data.getDoubles(COL_MEAN).multiply(data.get(COL_WEIGHT)));
- data.addSeries(COL_WEIGHTED_STD, data.getDoubles(COL_STD).multiply(data.get(COL_WEIGHT)));
-
- DoubleSeries totalWeight = data.getDoubles(COL_WEIGHT).sum();
- if (totalWeight.hasNull()) {
- return new DataFrame();
- }
-
- DataFrame out = new DataFrame();
- out.addSeries(COL_MEAN, data.getDoubles(COL_WEIGHTED_MEAN).sum().divide(totalWeight));
- out.addSeries(COL_STD, data.getDoubles(COL_WEIGHTED_STD).sum().divide(totalWeight).pow(0.5)); // back to std
-
- return out;
- }
-
- /**
- * Helper generates baseline value for timestamp via exponential smoothing
- *
- * @param df time series data
- * @param tCurrent current time stamp
- * @param changePoints set of change points
- * @return baseline value for time stamp
- */
- double makeBaseline(DataFrame df, long tCurrent, TreeSet<Long> changePoints) {
- DateTime now = new DateTime(tCurrent);
-
- int index = df.getLongs(COL_TIME).find(tCurrent);
- if (index < 0) {
- return Double.NaN;
- }
-
- if (this.baselineWeeks <= 0) {
- return 0.0;
- }
-
- Long lastChangePoint = changePoints.floor(tCurrent);
-
- // construct baseline
- DataFrame raw = new DataFrame(COL_TIME, LongSeries.nulls(this.baselineWeeks))
- .addSeries(COL_VALUE, DoubleSeries.nulls(this.baselineWeeks))
- .addSeries(COL_WEIGHT, DoubleSeries.nulls(this.baselineWeeks));
-
- long[] sTimestamp = raw.getLongs(COL_TIME).values();
- double[] sValue = raw.getDoubles(COL_VALUE).values();
- double[] sWeight = raw.getDoubles(COL_WEIGHT).values();
-
- // exponential smoothing with weekly seasonality
- for (int i = 0; i < this.baselineWeeks; i++) {
- int offset = this.baselineWeeks - i;
- long timestamp = now.minus(new Period().withWeeks(offset)).getMillis();
- sTimestamp[i] = timestamp;
-
- int valueIndex = df.getLongs(COL_TIME).find(timestamp);
- if (valueIndex >= 0) {
- sValue[i] = df.getDouble(COL_VALUE, valueIndex);
- sWeight[i] = Math.pow(this.learningRate, offset - 1);
-
- // adjust for change point
- if (lastChangePoint != null && timestamp < lastChangePoint) {
- sWeight[i] *= 0.001;
- }
-
- // adjust for outlier
- if (BooleanSeries.isTrue(df.getBoolean(COL_OUTLIER, valueIndex))) {
- sWeight[i] *= 0.001;
- }
- }
- }
-
- DataFrame data = raw.dropNull();
- data.addSeries(COL_WEIGHTED_VALUE, data.getDoubles(COL_VALUE).multiply(data.get(COL_WEIGHT)));
-
- DoubleSeries totalWeight = data.getDoubles(COL_WEIGHT).sum();
- if (totalWeight.hasNull()) {
- return Double.NaN;
- }
-
- DoubleSeries computed = data.getDoubles(COL_WEIGHTED_VALUE).sum().divide(totalWeight);
-
- if (computed.hasNull()) {
- return Double.NaN;
- }
-
- return computed.doubleValue();
- }
-
- /**
- * Container class for detection result
- */
- final class Result {
- final DataFrame data;
- final TreeSet<Long> changePoints;
-
- public Result(DataFrame data, TreeSet<Long> changePoints) {
- this.data = data;
- this.changePoints = changePoints;
- }
- }
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/RuleBasedFilterWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/RuleBasedFilterWrapper.java
deleted file mode 100644
index 093449b..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/RuleBasedFilterWrapper.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.algorithm;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.ConfigUtils;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipeline;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * This abstract filter wrapper allows user to plug in exclusion filter business rules in the detection pipeline.
- */
-public abstract class RuleBasedFilterWrapper extends DetectionPipeline {
- private static final String PROP_NESTED = "nested";
- private static final String PROP_CLASS_NAME = "className";
-
- private final List<Map<String, Object>> nestedProperties;
-
- public RuleBasedFilterWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
- super(provider, config, startTime, endTime);
- this.nestedProperties = ConfigUtils.getList(config.getProperties().get(PROP_NESTED));
- }
-
- /**
- * Runs the nested pipelines and calls the isQualified method to check if an anomaly passes the rule filter.
- * @return the detection pipeline result
- * @throws Exception
- */
- @Override
- public final DetectionPipelineResult run() throws Exception {
- List<MergedAnomalyResultDTO> candidates = new ArrayList<>();
- for (Map<String, Object> properties : this.nestedProperties) {
- DetectionConfigDTO nestedConfig = new DetectionConfigDTO();
-
- Preconditions.checkArgument(properties.containsKey(PROP_CLASS_NAME), "Nested missing " + PROP_CLASS_NAME);
- nestedConfig.setId(this.config.getId());
- nestedConfig.setName(this.config.getName());
- nestedConfig.setDescription(this.config.getDescription());
- nestedConfig.setProperties(properties);
-
- DetectionPipeline pipeline = this.provider.loadPipeline(nestedConfig, this.startTime, this.endTime);
-
- DetectionPipelineResult intermediate = pipeline.run();
- candidates.addAll(intermediate.getAnomalies());
- }
-
- Collection<MergedAnomalyResultDTO> anomalies =
- Collections2.filter(candidates, new Predicate<MergedAnomalyResultDTO>() {
- @Override
- public boolean apply(@Nullable MergedAnomalyResultDTO mergedAnomaly) {
- return mergedAnomaly != null && !mergedAnomaly.isChild() && isQualified(mergedAnomaly);
- }
- });
-
- return new DetectionPipelineResult(new ArrayList<>(anomalies));
- }
-
- /**
- * Sub-classes override this method to check if anomaly passes the filter wrapper.
- * @param anomaly
- * @return if the anomaly passes the filter
- */
- abstract boolean isQualified(MergedAnomalyResultDTO anomaly);
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdAlgorithm.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdAlgorithm.java
deleted file mode 100644
index e761acc..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdAlgorithm.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.algorithm;
-
-import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
-import org.apache.pinot.thirdeye.detection.StaticDetectionPipeline;
-import org.apache.pinot.thirdeye.detection.spi.model.InputData;
-import org.apache.pinot.thirdeye.detection.spi.model.InputDataSpec;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-import java.util.Collections;
-import java.util.List;
-import org.apache.commons.collections4.MapUtils;
-
-import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-/**
- * Simple threshold rule algorithm with (optional) upper and lower bounds on a metric value.
- */
-public class ThresholdAlgorithm extends StaticDetectionPipeline {
- private final String COL_TOO_HIGH = "tooHigh";
- private final String COL_TOO_LOW = "tooLow";
- private final String COL_ANOMALY = "anomaly";
-
- private final double min;
- private final double max;
- private final MetricSlice slice;
-
- public ThresholdAlgorithm(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
- super(provider, config, startTime, endTime);
- this.min = MapUtils.getDoubleValue(config.getProperties(), "min", Double.NaN);
- this.max = MapUtils.getDoubleValue(config.getProperties(), "max", Double.NaN);
-
- String metricUrn = MapUtils.getString(config.getProperties(), "metricUrn");
- MetricEntity me = MetricEntity.fromURN(metricUrn);
- this.slice = MetricSlice.from(me.getId(), this.startTime, this.endTime, me.getFilters());
- }
-
- @Override
- public InputDataSpec getInputDataSpec() {
- return new InputDataSpec()
- .withTimeseriesSlices(Collections.singletonList(this.slice));
- }
-
- @Override
- public DetectionPipelineResult run(InputData data) {
- DataFrame df = data.getTimeseries().get(this.slice);
-
- // defaults
- df.addSeries(COL_TOO_HIGH, BooleanSeries.fillValues(df.size(), false));
- df.addSeries(COL_TOO_LOW, BooleanSeries.fillValues(df.size(), false));
-
- // max
- if (!Double.isNaN(this.max)) {
- df.addSeries(COL_TOO_HIGH, df.getDoubles(COL_VALUE).gt(this.max));
- }
-
- // min
- if (!Double.isNaN(this.min)) {
- df.addSeries(COL_TOO_LOW, df.getDoubles(COL_VALUE).lt(this.min));
- }
-
- df.mapInPlace(BooleanSeries.HAS_TRUE, COL_ANOMALY, COL_TOO_HIGH, COL_TOO_LOW);
-
- List<MergedAnomalyResultDTO> anomalies = this.makeAnomalies(this.slice, df, COL_ANOMALY);
-
- return new DetectionPipelineResult(anomalies)
- .setDiagnostics(Collections.singletonMap(DetectionPipelineResult.DIAGNOSTICS_DATA, (Object) df.dropAllNullColumns()));
- }
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java
deleted file mode 100644
index 71676de..0000000
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.algorithm;
-
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-import java.util.Collections;
-import java.util.Map;
-import org.apache.commons.collections4.MapUtils;
-
-import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-/**
- * This filter wrapper filters the anomalies if either the min or max thresholds do not pass.
- */
-public class ThresholdRuleFilterWrapper extends RuleBasedFilterWrapper {
- private static final String PROP_MIN = "min";
- private static final double PROP_MIN_DEFAULT = Double.NaN;
-
- private static final String PROP_MAX = "max";
- private static final double PROP_MAX_DEFAULT = Double.NaN;
-
- private final double min;
- private final double max;
-
- public ThresholdRuleFilterWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
- super(provider, config, startTime, endTime);
- this.min = MapUtils.getDoubleValue(config.getProperties(), PROP_MIN, PROP_MIN_DEFAULT);
- this.max = MapUtils.getDoubleValue(config.getProperties(), PROP_MAX, PROP_MAX_DEFAULT);
- }
-
- @Override
- boolean isQualified(MergedAnomalyResultDTO anomaly) {
- MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
- MetricSlice currentSlice = MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
-
- Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singleton(currentSlice), Collections.<String>emptyList(), -1);
- double currentValue = getValueFromAggregates(currentSlice, aggregates);
- if (!Double.isNaN(this.min) && currentValue < this.min) {
- return false;
- }
- if (!Double.isNaN(this.max) && currentValue > this.max) {
- return false;
- }
- return true;
- }
-
- double getValueFromAggregates(MetricSlice slice, Map<MetricSlice, DataFrame> aggregates) {
- return aggregates.get(slice).getDouble(COL_VALUE, 0);
- }
-}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleDetector.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleDetector.java
index e008748..9250a65 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleDetector.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/AbsoluteChangeRuleDetector.java
@@ -109,7 +109,7 @@ public class AbsoluteChangeRuleDetector implements AnomalyDetector<AbsoluteChang
// make anomalies
DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
- List<MergedAnomalyResultDTO> anomalies = DetectionUtils.makeAnomalies(slice, df, COL_ANOMALY, window.getEndMillis(),
+ List<MergedAnomalyResultDTO> anomalies = DetectionUtils.makeAnomalies(slice, df, COL_ANOMALY,
DetectionUtils.getMonitoringGranularityPeriod(monitoringGranularity, datasetConfig), datasetConfig);
DataFrame baselineWithBoundaries = constructAbsoluteChangeBoundaries(df);
return DetectionResult.from(anomalies, TimeSeries.fromDataFrame(baselineWithBoundaries));
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/HoltWintersDetector.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/HoltWintersDetector.java
index ddb902c..76917d1 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/HoltWintersDetector.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/HoltWintersDetector.java
@@ -217,7 +217,7 @@ public class HoltWintersDetector implements BaselineProvider<HoltWintersDetector
// Anomalies
List<MergedAnomalyResultDTO> anomalyResults =
- DetectionUtils.makeAnomalies(sliceData, df, COL_ANOMALY, window.getEndMillis(),
+ DetectionUtils.makeAnomalies(sliceData, df, COL_ANOMALY,
DetectionUtils.getMonitoringGranularityPeriod(this.monitoringGranularity, datasetConfig), datasetConfig);
dfBase = dfBase.joinRight(df.retainSeries(COL_TIME, COL_CURR), COL_TIME);
return DetectionResult.from(anomalyResults, TimeSeries.fromDataFrame(dfBase));
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/MeanVarianceRuleDetector.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/MeanVarianceRuleDetector.java
index 4eed71c..44326cf 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/MeanVarianceRuleDetector.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/MeanVarianceRuleDetector.java
@@ -170,7 +170,6 @@ public class MeanVarianceRuleDetector implements AnomalyDetector<MeanVarianceRul
// Anomalies
List<MergedAnomalyResultDTO> anomalyResults = DetectionUtils.makeAnomalies(slice, df, COL_ANOMALY,
- window.getEndMillis(),
DetectionUtils.getMonitoringGranularityPeriod(timeGranularity.toAggregationGranularityString(),
datasetConfig), datasetConfig);
dfBase = dfBase.joinRight(df.retainSeries(COL_TIME, COL_CURR), COL_TIME);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleDetector.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleDetector.java
index 6bf4bed..5e6ffcc 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleDetector.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/PercentageChangeRuleDetector.java
@@ -138,7 +138,7 @@ public class PercentageChangeRuleDetector implements AnomalyDetector<PercentageC
df.mapInPlace(BooleanSeries.ALL_TRUE, COL_ANOMALY, COL_PATTERN, COL_CHANGE_VIOLATION);
}
- List<MergedAnomalyResultDTO> anomalies = DetectionUtils.makeAnomalies(slice, df, COL_ANOMALY, window.getEndMillis(),
+ List<MergedAnomalyResultDTO> anomalies = DetectionUtils.makeAnomalies(slice, df, COL_ANOMALY,
DetectionUtils.getMonitoringGranularityPeriod(monitoringGranularity, datasetConfig), datasetConfig);
DataFrame baselineWithBoundaries = constructPercentageChangeBoundaries(df);
return DetectionResult.from(anomalies, TimeSeries.fromDataFrame(baselineWithBoundaries));
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleDetector.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleDetector.java
index 5d91515..1fbbb02 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleDetector.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleDetector.java
@@ -88,7 +88,7 @@ public class ThresholdRuleDetector implements AnomalyDetector<ThresholdRuleDetec
}
df.mapInPlace(BooleanSeries.HAS_TRUE, COL_ANOMALY, COL_TOO_HIGH, COL_TOO_LOW);
DatasetConfigDTO datasetConfig = data.getDatasetForMetricId().get(me.getId());
- List<MergedAnomalyResultDTO> anomalies = DetectionUtils.makeAnomalies(slice, df, COL_ANOMALY, endTime,
+ List<MergedAnomalyResultDTO> anomalies = DetectionUtils.makeAnomalies(slice, df, COL_ANOMALY,
DetectionUtils.getMonitoringGranularityPeriod(monitoringGranularity, datasetConfig), datasetConfig);
DataFrame baselineWithBoundaries = constructBaselineAndBoundaries(df);
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineAlgorithmTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineAlgorithmTest.java
deleted file mode 100644
index d00b12b..0000000
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineAlgorithmTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.algorithm;
-
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
-import org.apache.pinot.thirdeye.detection.MockDataProvider;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregateType;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-public class BaselineAlgorithmTest {
- private static final String PROP_METRIC_URN = "metricUrn";
- private static final String PROP_AGGREGATION = "aggregation";
- private static final String PROP_WEEKS = "weeks";
- private static final String PROP_CHANGE = "change";
- private static final String PROP_DIFFERENCE = "difference";
- private static final String PROP_TIMEZONE = "timezone";
-
- private DataProvider provider;
- private BaselineAlgorithm algorithm;
- private DataFrame data;
- private Map<String, Object> properties;
- private DetectionConfigDTO config;
-
- @BeforeMethod
- public void beforeMethod() throws Exception {
- try (Reader dataReader = new InputStreamReader(this.getClass().getResourceAsStream("timeseries-4w.csv"))) {
- this.data = DataFrame.fromCsv(dataReader);
- this.data.setIndex(COL_TIME);
- this.data.addSeries(COL_TIME, this.data.getLongs(COL_TIME).multiply(1000));
- }
-
- MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
- metricConfigDTO.setId(1L);
- metricConfigDTO.setName("thirdeye-test");
- metricConfigDTO.setDataset("thirdeye-test-dataset");
-
- Map<MetricSlice, DataFrame> timeseries = new HashMap<>();
- timeseries.put(MetricSlice.from(1L, 0L, 604800000L), this.data);
- timeseries.put(MetricSlice.from(1L, 604800000L, 1209600000L), this.data);
- timeseries.put(MetricSlice.from(1L, 1209600000L, 1814400000L), this.data);
- timeseries.put(MetricSlice.from(1L, 1814400000L, 2419200000L), this.data);
-
- this.properties = new HashMap<>();
- this.properties.put(PROP_METRIC_URN, "thirdeye:metric:1");
-
- this.config = new DetectionConfigDTO();
- this.config.setProperties(properties);
- this.config.setId(-1L);
- this.provider = new MockDataProvider()
- .setTimeseries(timeseries)
- .setMetrics(Collections.singletonList(metricConfigDTO));
- }
-
- @Test
- public void testWeekOverWeekDifference() throws Exception {
- this.properties.put(PROP_DIFFERENCE, 400);
- this.algorithm = new BaselineAlgorithm(this.provider, this.config, 1814400000L, 2419200000L);
-
- DetectionPipelineResult result = this.algorithm.run();
-
- List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
- Assert.assertEquals(result.getLastTimestamp(), 2376000000L);
- Assert.assertEquals(anomalies.size(), 1);
- Assert.assertEquals(anomalies.get(0).getStartTime(), 2372400000L);
- Assert.assertEquals(anomalies.get(0).getEndTime(), 2376000000L);
- }
-
- @Test
- public void testWeekOverWeekChange() throws Exception {
- this.properties.put(PROP_CHANGE, 0.4);
- this.algorithm = new BaselineAlgorithm(this.provider, this.config, 1814400000L, 2419200000L);
- this.config.setId(-1L);
- DetectionPipelineResult result = this.algorithm.run();
-
- List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
- Assert.assertEquals(result.getLastTimestamp(), 2383200000L);
- Assert.assertEquals(anomalies.size(), 2);
- Assert.assertEquals(anomalies.get(0).getStartTime(), 2372400000L);
- Assert.assertEquals(anomalies.get(0).getEndTime(), 2376000000L);
- Assert.assertEquals(anomalies.get(1).getStartTime(), 2379600000L);
- Assert.assertEquals(anomalies.get(1).getEndTime(), 2383200000L);
- }
-
- @Test
- public void testThreeWeekMedianChange() throws Exception {
- this.properties.put(PROP_WEEKS, 3);
- this.properties.put(PROP_AGGREGATION, BaselineAggregateType.MEDIAN.toString());
- this.properties.put(PROP_CHANGE, 0.3);
- this.config.setId(-1L);
- this.algorithm = new BaselineAlgorithm(this.provider, this.config, 1814400000L, 2419200000L);
- DetectionPipelineResult result = this.algorithm.run();
-
- List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
- Assert.assertEquals(result.getLastTimestamp(), 2325600000L);
- Assert.assertEquals(anomalies.size(), 5);
- Assert.assertEquals(anomalies.get(0).getStartTime(), 2005200000L);
- Assert.assertEquals(anomalies.get(0).getEndTime(), 2008800000L);
- Assert.assertEquals(anomalies.get(1).getStartTime(), 2134800000L);
- Assert.assertEquals(anomalies.get(1).getEndTime(), 2138400000L);
- Assert.assertEquals(anomalies.get(2).getStartTime(), 2152800000L);
- Assert.assertEquals(anomalies.get(2).getEndTime(), 2156400000L);
- Assert.assertEquals(anomalies.get(3).getStartTime(), 2181600000L);
- Assert.assertEquals(anomalies.get(3).getEndTime(), 2185200000L);
- Assert.assertEquals(anomalies.get(4).getStartTime(), 2322000000L);
- Assert.assertEquals(anomalies.get(4).getEndTime(), 2325600000L);
- }
-
-}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MovingWindowAlgorithmTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MovingWindowAlgorithmTest.java
deleted file mode 100644
index a91599c..0000000
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MovingWindowAlgorithmTest.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.algorithm;
-
-import org.apache.pinot.thirdeye.anomalydetection.context.AnomalyFeedback;
-import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
-import org.apache.pinot.thirdeye.detection.DetectionTestUtils;
-import org.apache.pinot.thirdeye.detection.MockDataProvider;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-public class MovingWindowAlgorithmTest {
- private static final String METRIC_NAME = "myMetric";
- private static final String DATASET_NAME = "myDataset";
-
- private static final String PROP_METRIC_URN = "metricUrn";
- private static final String PROP_WINDOW_SIZE = "windowSize";
- private static final String PROP_LOOKBACK_PERIOD = "lookbackPeriod";
- private static final String PROP_REWORK_PERIOD = "reworkPeriod";
- private static final String PROP_QUANTILE_MIN = "quantileMin";
- private static final String PROP_QUANTILE_MAX = "quantileMax";
- private static final String PROP_ZSCORE_MIN = "zscoreMin";
- private static final String PROP_ZSCORE_MAX = "zscoreMax";
- private static final String PROP_KERNEL_SIZE = "kernelSize";
- private static final String PROP_ZSCORE_OUTLIER = "zscoreOutlier";
- private static final String PROP_CHANGE_DURATION = "changeDuration";
- private static final String PROP_BASELINE_WEEKS = "baselineWeeks";
-
- private DataProvider provider;
- private MovingWindowAlgorithm algorithm;
- private DataFrame data;
- private Map<String, Object> properties;
- private DetectionConfigDTO config;
- private List<MergedAnomalyResultDTO> anomalies;
-
- @BeforeMethod
- public void beforeMethod() throws Exception {
- try (Reader dataReader = new InputStreamReader(this.getClass().getResourceAsStream("timeseries-4w.csv"))) {
- this.data = DataFrame.fromCsv(dataReader);
- this.data.setIndex(COL_TIME);
- this.data.addSeries(COL_TIME, this.data.getLongs(COL_TIME).multiply(1000));
- }
-
- MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
- metricConfigDTO.setId(1L);
- metricConfigDTO.setName(METRIC_NAME);
- metricConfigDTO.setDataset(DATASET_NAME);
-
- DatasetConfigDTO datasetConfigDTO = new DatasetConfigDTO();
- datasetConfigDTO.setId(2L);
- datasetConfigDTO.setDataset(DATASET_NAME);
- datasetConfigDTO.setTimeDuration(1);
- datasetConfigDTO.setTimeUnit(TimeUnit.HOURS);
-
- Map<MetricSlice, DataFrame> timeseries = new HashMap<>();
- timeseries.put(MetricSlice.from(1L, 100000L, 300000L), this.data);
- timeseries.put(MetricSlice.from(1L, 0L, 2419200000L), this.data);
- timeseries.put(MetricSlice.from(1L, 0L, 1209600000L), this.data);
- timeseries.put(MetricSlice.from(1L, 604800000L, 2419200000L), this.data);
-
- this.properties = new HashMap<>();
- this.properties.put(PROP_METRIC_URN, "thirdeye:metric:1");
- this.properties.put(PROP_WINDOW_SIZE, "1week");
- this.properties.put(PROP_LOOKBACK_PERIOD, "0");
- this.properties.put(PROP_REWORK_PERIOD, "0");
- this.properties.put(PROP_ZSCORE_OUTLIER, Double.NaN);
- this.properties.put(PROP_CHANGE_DURATION, "0");
-
- this.config = new DetectionConfigDTO();
- this.config.setProperties(properties);
- this.config.setId(-1L);
-
- this.anomalies = new ArrayList<>();
-
- this.provider = new MockDataProvider()
- .setTimeseries(timeseries)
- .setMetrics(Collections.singletonList(metricConfigDTO))
- .setDatasets(Collections.singletonList(datasetConfigDTO))
- .setAnomalies(this.anomalies);
-
- }
-
- //
- // quantile min
- //
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testQuantileMinTooLow() {
- this.properties.put(PROP_QUANTILE_MIN, -0.001);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 1209600000L);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testQuantileMinTooHigh() {
- this.properties.put(PROP_QUANTILE_MIN, 1.001);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 1209600000L);
- }
-
- @Test
- public void testQuantileMinLimit() throws Exception {
- this.properties.put(PROP_QUANTILE_MIN, 0.0);
- this.properties.put(PROP_ZSCORE_OUTLIER, Double.NaN);
- this.properties.put(PROP_CHANGE_DURATION, "0");
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 2419200000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 12);
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(669600000L, 673200000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(896400000L, 900000000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(921600000L, 928800000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(954000000L, 961200000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1767600000L, 1771200000L)));
- }
-
- @Test
- public void testQuantileDiffMin() throws Exception {
- this.properties.put(PROP_QUANTILE_MIN, 0.01);
- this.properties.put(PROP_BASELINE_WEEKS, 1);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 1209600000L, 2419200000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 9);
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1555200000L, 1558800000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1767600000L, 1771200000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(2181600000L, 2185200000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(2368800000L, 2372400000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(2412000000L, 2415600000L)));
- }
-
- //
- // quantile max
- //
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testQuantileMaxTooLow() {
- this.properties.put(PROP_QUANTILE_MAX, -0.001);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 1209600000L);
- }
-
- @Test(expectedExceptions = IllegalArgumentException.class)
- public void testQuantileMaxTooHigh() {
- this.properties.put(PROP_QUANTILE_MAX, 1.001);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 1209600000L);
- }
-
- @Test
- public void testQuantileMax() throws Exception {
- this.properties.put(PROP_QUANTILE_MAX, 0.975);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 1209600000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 12);
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(626400000L, 630000000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(705600000L, 709200000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(846000000L, 849600000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(892800000L, 896400000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(903600000L, 907200000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(918000000L, 921600000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(950400000L, 954000000L)));
- }
-
- @Test
- public void testQuantileMaxLimit() throws Exception {
- this.properties.put(PROP_QUANTILE_MAX, 1.0);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 2419200000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 15);
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(626400000L, 630000000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(846000000L, 849600000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(892800000L, 896400000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(903600000L, 907200000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(950400000L, 954000000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1576800000L, 1580400000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1756800000L, 1767600000L)));
- }
-
- //
- // zscore
- //
-
- @Test
- public void testZScoreMin() throws Exception {
- this.properties.put(PROP_ZSCORE_MIN, -2.5);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 2419200000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 5);
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(921600000L, 928800000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(939600000L, 950400000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(954000000L, 968400000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1767600000L, 1778400000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1792800000L, 1796400000L)));
- }
-
- @Test
- public void testZScoreMax() throws Exception {
- this.properties.put(PROP_ZSCORE_MAX, 2.5);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 2419200000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 2);
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(950400000L, 954000000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1756800000L, 1767600000L)));
- }
-
- @Test
- public void testZScoreDiffMax() throws Exception {
- this.properties.put(PROP_ZSCORE_MAX, 2.25);
- this.properties.put(PROP_KERNEL_SIZE, 4);
- this.properties.put(PROP_BASELINE_WEEKS, 1);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 1209600000L, 2419200000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 4);
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1278000000L, 1328400000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1540800000L, 1594800000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1756800000L, 1767600000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(2372400000L, 2383200000L)));
- }
-
- //
- // kernel
- //
-
- @Test
- public void testKernelMin() throws Exception {
- this.properties.put(PROP_ZSCORE_MIN, -2.5);
- this.properties.put(PROP_KERNEL_SIZE, 4);
- this.properties.put(PROP_BASELINE_WEEKS, 1);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 1209600000L, 2419200000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 2);
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(2221200000L, 2235600000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(2404800000L, 2415600000L)));
- }
-
- @Test
- public void testKernelMax() throws Exception {
- this.properties.put(PROP_ZSCORE_MAX, 2.5);
- this.properties.put(PROP_KERNEL_SIZE, 4);
- this.properties.put(PROP_BASELINE_WEEKS, 1);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 1209600000L, 2419200000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 2);
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1540800000L, 1594800000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(2372400000L, 2379600000L)));
- }
-
- //
- // smoothing, outliers and change point correction
- //
-
- @Test
- public void testOutlierCorrection() throws Exception {
- this.data.set(COL_VALUE, this.data.getDoubles(COL_TIME).between(86400000L, 172800000L),
- this.data.getDoubles(COL_VALUE).add(500));
-
- this.properties.put(PROP_QUANTILE_MAX, 0.975);
- this.properties.put(PROP_ZSCORE_OUTLIER, 1.5);
-
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 1209600000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 12);
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(626400000L, 630000000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(705600000L, 709200000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(846000000L, 849600000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(892800000L, 896400000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(903600000L, 907200000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(918000000L, 921600000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(928800000L, 932400000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(950400000L, 954000000L)));
-// Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(972000000L, 975600000L)));
- }
-
- @Test
- public void testChangePointFromData() throws Exception {
- this.data.set(COL_VALUE, this.data.getDoubles(COL_TIME).between(298800000L, 903600000L),
- this.data.getDoubles(COL_VALUE).multiply(1.77));
- this.data.set(COL_VALUE, this.data.getDoubles(COL_TIME).gte(903600000L),
- this.data.getDoubles(COL_VALUE).multiply(3.61));
-
- this.properties.put(PROP_ZSCORE_MAX, 1.5);
- this.properties.put(PROP_ZSCORE_OUTLIER, 3.0);
- this.properties.put(PROP_CHANGE_DURATION, "2d");
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 1209600000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 2);
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(892800000L, 896400000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(903600000L, 1015200000L)));
- }
-
- @Test
- public void testChangePointFromAnomaly() throws Exception {
- this.data.set(COL_VALUE, this.data.getDoubles(COL_TIME).between(298800000L, 903600000L),
- this.data.getDoubles(COL_VALUE).multiply(1.77));
- this.data.set(COL_VALUE, this.data.getDoubles(COL_TIME).gte(903600000L),
- this.data.getDoubles(COL_VALUE).multiply(3.61));
-
- this.config.setId(1L);
-
- this.anomalies.add(makeAnomalyChangePoint(1L, 298800000L, 300000000L));
- this.anomalies.add(makeAnomalyChangePoint(1L, 903600000L, 910000000L));
-
- this.properties.put(PROP_ZSCORE_MAX, 2.5);
- this.properties.put(PROP_ZSCORE_OUTLIER, 3.0);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 604800000L, 1209600000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 3);
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1L, 903600000L, 907200000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1L, 918000000L, 921600000L)));
- Assert.assertTrue(res.getAnomalies().contains(makeAnomaly(1L, 950400000L, 954000000L)));
- }
-
- //
- // baselines
- //
-
- // TODO test baseline aggregation over multiple weeks
-
- //
- // edge cases
- //
-
- @Test
- public void testNoData() throws Exception {
- this.properties.put(PROP_QUANTILE_MIN, 0.0);
- this.properties.put(PROP_QUANTILE_MAX, 1.0);
- this.properties.put(PROP_ZSCORE_MIN, -3.0);
- this.properties.put(PROP_ZSCORE_MAX, 3.0);
- this.properties.put(PROP_WINDOW_SIZE, "100secs");
- this.properties.put(PROP_LOOKBACK_PERIOD, "0");
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 200000L, 300000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 0);
- }
-
- @Test
- public void testMinLookback() throws Exception {
- this.properties.put(PROP_WINDOW_SIZE, "100secs");
- this.properties.put(PROP_LOOKBACK_PERIOD, 100000);
- this.algorithm = new MovingWindowAlgorithm(this.provider, this.config, 300000L, 300000L);
- DetectionPipelineResult res = this.algorithm.run();
-
- Assert.assertEquals(res.getAnomalies().size(), 0);
- }
-
- //
- // utils
- //
-
- private static MergedAnomalyResultDTO makeAnomaly(long start, long end) {
- MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(-1L, start, end, METRIC_NAME, DATASET_NAME, Collections.<String, String>emptyMap());
- anomaly.setMetricUrn("thirdeye:metric:1");
- return anomaly;
- }
-
- private static MergedAnomalyResultDTO makeAnomaly(Long configId, long start, long end) {
- MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(configId, start, end, METRIC_NAME, DATASET_NAME, Collections.<String, String>emptyMap());
- anomaly.setMetricUrn("thirdeye:metric:1");
- return anomaly;
- }
-
- private static MergedAnomalyResultDTO makeAnomalyChangePoint(Long configId, long start, long end) {
- AnomalyFeedback feedback = new AnomalyFeedbackDTO();
- feedback.setFeedbackType(AnomalyFeedbackType.ANOMALY_NEW_TREND);
-
- MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(configId, start, end, METRIC_NAME, DATASET_NAME, Collections.<String, String>emptyMap());
- anomaly.setFeedback(feedback);
-
- return anomaly;
- }
-}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdAlgorithmTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdAlgorithmTest.java
deleted file mode 100644
index 1fdf189..0000000
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdAlgorithmTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.algorithm;
-
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
-import org.apache.pinot.thirdeye.detection.MockDataProvider;
-import org.apache.pinot.thirdeye.detection.StaticDetectionPipeline;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
-
-
-public class ThresholdAlgorithmTest {
- private DataProvider testDataProvider;
- private StaticDetectionPipeline thresholdAlgorithm;
-
- @BeforeMethod
- public void beforeMethod() {
- Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
- timeSeries.put(MetricSlice.from(123L, 0, 10),
- new DataFrame().addSeries(COL_VALUE, 0, 100, 200, 500, 1000).addSeries(COL_TIME, 0, 2, 4, 6, 8));
-
- MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
- metricConfigDTO.setId(123L);
- metricConfigDTO.setName("thirdeye-test");
- metricConfigDTO.setDataset("thirdeye-test-dataset");
-
- DatasetConfigDTO datasetConfigDTO = new DatasetConfigDTO();
- datasetConfigDTO.setId(124L);
- datasetConfigDTO.setDataset("thirdeye-test-dataset");
- datasetConfigDTO.setTimeDuration(2);
- datasetConfigDTO.setTimeUnit(TimeUnit.MILLISECONDS);
- datasetConfigDTO.setTimezone("UTC");
-
- DetectionConfigDTO detectionConfigDTO = new DetectionConfigDTO();
- detectionConfigDTO.setId(125L);
- Map<String, Object> properties = new HashMap<>();
- properties.put("min", 100);
- properties.put("max", 500);
- properties.put("metricUrn", "thirdeye:metric:123");
- detectionConfigDTO.setProperties(properties);
-
- this.testDataProvider = new MockDataProvider()
- .setMetrics(Collections.singletonList(metricConfigDTO))
- .setDatasets(Collections.singletonList(datasetConfigDTO))
- .setTimeseries(timeSeries);
- this.thresholdAlgorithm = new ThresholdAlgorithm(this.testDataProvider, detectionConfigDTO, 0, 10);
- }
-
- @Test
- public void testThresholdAlgorithmRun() throws Exception {
- DetectionPipelineResult result = this.thresholdAlgorithm.run();
- List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
- Assert.assertEquals(result.getLastTimestamp(), 10);
- Assert.assertEquals(anomalies.size(), 2);
- Assert.assertEquals(anomalies.get(0).getStartTime(), 0);
- Assert.assertEquals(anomalies.get(0).getEndTime(), 2);
- Assert.assertEquals(anomalies.get(1).getStartTime(), 8);
- Assert.assertEquals(anomalies.get(1).getEndTime(), 10);
- }
-}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
deleted file mode 100644
index af024f8..0000000
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.integration;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
-import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.detection.DetectionPipeline;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
-import org.apache.pinot.thirdeye.detection.DetectionTestUtils;
-import org.apache.pinot.thirdeye.detection.MockDataProvider;
-import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-public class MergeDimensionThresholdIntegrationTest {
- private static final ObjectMapper MAPPER = new ObjectMapper();
-
- private static final String METRIC1 = "myMetric1";
- private static final String METRIC2 = "myMetric2";
- private static final String DATASET = "myDataset2";
- private static final Map<String, String> ONE_TWO = new HashMap<>();
- static {
- ONE_TWO.put("a", "1");
- ONE_TWO.put("b", "2");
- }
-
- private Map<String, Object> properties;
- private DetectionPipelineLoader loader;
- private MockDataProvider provider;
- private DetectionConfigDTO config;
-
- private DataFrame data1;
- private DataFrame data2;
- private MetricConfigDTO metric1;
- private MetricConfigDTO metric2;
- private DatasetConfigDTO dataset;
-
- private Map<MetricSlice, DataFrame> timeseries;
- private Map<MetricSlice, DataFrame> aggregates;
- private List<MetricConfigDTO> metrics;
- private List<MergedAnomalyResultDTO> anomalies;
- private List<DatasetConfigDTO> datasets;
-
- @BeforeMethod
- public void beforeMethod() throws Exception {
- URL url = this.getClass().getResource("mergeDimensionThresholdProperties.json");
- this.properties = MAPPER.readValue(url, Map.class);
-
- try (Reader dataReader = new InputStreamReader(this.getClass().getResourceAsStream("timeseries.csv"))) {
- this.data1 = DataFrame.fromCsv(dataReader);
- }
-
- try (Reader dataReader = new InputStreamReader(this.getClass().getResourceAsStream("timeseries.csv"))) {
- this.data2 = DataFrame.fromCsv(dataReader);
- }
-
- this.loader = new DetectionPipelineLoader();
-
- this.aggregates = new HashMap<>();
- this.aggregates.put(MetricSlice.from(1, 0, 18000), this.data1);
-
- this.timeseries = new HashMap<>();
- this.timeseries.put(MetricSlice.from(2, 0, 18000), this.data2);
-
- this.metric1 = new MetricConfigDTO();
- this.metric1.setId(1L);
- this.metric1.setName(METRIC1);
- this.metric1.setDataset(DATASET);
- this.metric2 = new MetricConfigDTO();
- this.metric2.setId(2L);
- this.metric2.setName(METRIC2);
- this.metric2.setDataset(DATASET);
-
- this.metrics = new ArrayList<>();
- this.metrics.add(this.metric2);
- this.metrics.add(this.metric1);
-
- this.dataset = new DatasetConfigDTO();
- this.dataset.setId(3L);
- this.dataset.setDataset(DATASET);
- this.dataset.setTimeDuration(1);
- this.dataset.setTimeUnit(TimeUnit.HOURS);
- this.dataset.setTimezone("UTC");
-
- this.datasets = new ArrayList<>();
- this.datasets.add(this.dataset);
-
- this.anomalies = new ArrayList<>();
- this.anomalies.add(makeAnomaly(9500, 10800, "thirdeye:metric:2"));
-
- this.provider = new MockDataProvider()
- .setLoader(this.loader)
- .setAggregates(this.aggregates)
- .setTimeseries(this.timeseries)
- .setMetrics(this.metrics)
- .setDatasets(this.datasets)
- .setAnomalies(this.anomalies);
-
- this.config = new DetectionConfigDTO();
- this.config.setProperties(this.properties);
- this.config.setId(-1L);
- }
-
- @Test
- public void testMergeDimensionThreshold() throws Exception {
- DetectionPipeline pipeline = this.loader.from(this.provider, this.config, 0, 18000);
- DetectionPipelineResult result = pipeline.run();
-
- Assert.assertEquals(result.getAnomalies().size(), 3);
-
- Assert.assertTrue(result.getAnomalies().contains(makeAnomaly(9500, 18000, "thirdeye:metric:2")));
- Assert.assertTrue(result.getAnomalies().contains(makeAnomaly(0, 7200, "thirdeye:metric:2:a%3D1:b%3D2")));
- Assert.assertTrue(result.getAnomalies().contains(makeAnomaly(14400, 18000, "thirdeye:metric:2:a%3D1:b%3D2")));
- }
-
- private static MergedAnomalyResultDTO makeAnomaly(long start, long end, String metricUrn) {
- MetricEntity me = MetricEntity.fromURN(metricUrn);
-
- Map<String, String> dimensions = new HashMap<>();
- for (Map.Entry<String, String> entry : me.getFilters().entries()) {
- dimensions.put(entry.getKey(), entry.getValue());
- }
-
- MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(-1L, start, end, METRIC2, DATASET, dimensions);
- anomaly.setMetricUrn(metricUrn);
- return anomaly;
- }
-}
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/integration/mergeDimensionThresholdProperties.json b/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/integration/mergeDimensionThresholdProperties.json
deleted file mode 100644
index 870531c..0000000
--- a/thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/integration/mergeDimensionThresholdProperties.json
+++ /dev/null
@@ -1,31 +0,0 @@
-{
- "className": "org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper",
- "maxGap": 100,
- "maxDuration": 10000,
- "nested": [
- {
- "className": "org.apache.pinot.thirdeye.detection.algorithm.DimensionWrapper",
- "metricUrn": "thirdeye:metric:1:a%3D1:a%3D2",
- "nestedMetricUrns": [ "thirdeye:metric:2" ],
- "dimensions": ["a", "b"],
- "minValue": 10,
- "lookback": 0,
- "k": 3,
- "nested": [
- {
- "className": "org.apache.pinot.thirdeye.detection.algorithm.ThresholdAlgorithm",
- "min": 15
- },
- {
- "className": "org.apache.pinot.thirdeye.detection.algorithm.ThresholdAlgorithm",
- "max": 20
- }
- ]
- },
- {
- "className": "org.apache.pinot.thirdeye.detection.algorithm.ThresholdAlgorithm",
- "metricUrn": "thirdeye:metric:2",
- "max": 70
- }
- ]
-}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org