You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/03/25 22:42:40 UTC
[incubator-pinot] branch master updated: [TE] detection - current &
baseline provider lazy evaluation (#4014)
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1870c36 [TE] detection - current & baseline provider lazy evaluation (#4014)
1870c36 is described below
commit 1870c3655a47a10098cbaf07d72c88c715c5c55d
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Mar 25 15:42:35 2019 -0700
[TE] detection - current & baseline provider lazy evaluation (#4014)
Update the anomaly current & baseline value only when the anomaly is merged.
Clean up current and baseline loader
---
.../api/user/dashboard/UserDashboardResource.java | 9 ---
.../bao/jdbc/MergedAnomalyResultManagerImpl.java | 6 +-
.../detection/alert/DetectionAlertTaskRunner.java | 6 --
.../thirdeye/detection/algorithm/MergeWrapper.java | 33 ++++-----
.../wrapper/BaselineFillingMergeWrapper.java | 27 ++++++--
.../wrapper/ChildKeepingMergeWrapper.java | 24 ++++---
.../wrapper/BaselineFillingMergeWrapperTest.java | 79 ++++++++++++++++------
.../wrapper/ChildKeepingMergeWrapperTest.java | 38 +++++++++++
8 files changed, 155 insertions(+), 67 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
index e2d7fc8..6bf84fc 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
@@ -78,8 +78,6 @@ public class UserDashboardResource {
private final DatasetConfigManager datasetDAO;
private final DetectionConfigManager detectionDAO;
private final DetectionAlertConfigManager detectionAlertDAO;
- private final AggregationLoader aggregationLoader;
- private final CurrentAndBaselineLoader currentAndBaselineLoader;
public UserDashboardResource(MergedAnomalyResultManager anomalyDAO, MetricConfigManager metricDAO,
@@ -90,11 +88,6 @@ public class UserDashboardResource {
this.datasetDAO = datasetDAO;
this.detectionDAO = detectionDAO;
this.detectionAlertDAO = detectionAlertDAO;
-
- this.aggregationLoader =
- new DefaultAggregationLoader(this.metricDAO, this.datasetDAO, ThirdEyeCacheRegistry.getInstance().getQueryCache(),
- ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
- this.currentAndBaselineLoader = new CurrentAndBaselineLoader(this.metricDAO, this.datasetDAO, this.aggregationLoader);
}
/**
@@ -335,8 +328,6 @@ public class UserDashboardResource {
}
});
- this.currentAndBaselineLoader.fillInCurrentAndBaselineValue(anomalies);
-
return anomalies;
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
index 7bbe9e6..3940a2e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
@@ -414,11 +414,9 @@ public class MergedAnomalyResultManagerImpl extends AbstractManagerImpl<MergedAn
if (child.getChildren() != null && !child.getChildren().isEmpty()) {
throw new IllegalArgumentException("Multi-level anomaly nesting not supported");
}
-
- child.setChild(true);
- save(child);
}
-
+ child.setChild(true);
+ save(child);
childIds.add(child.getId());
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index 7e6bd71..20d9d2b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -52,7 +52,6 @@ public class DetectionAlertTaskRunner implements TaskRunner {
private static final Logger LOG = LoggerFactory.getLogger(DetectionAlertTaskRunner.class);
private final DetectionAlertTaskFactory detAlertTaskFactory;
- private CurrentAndBaselineLoader currentAndBaselineLoader;
private DetectionAlertConfigManager alertConfigDAO;
private MergedAnomalyResultManager mergedAnomalyDAO;
@@ -66,7 +65,6 @@ public class DetectionAlertTaskRunner implements TaskRunner {
AggregationLoader aggregationLoader =
new DefaultAggregationLoader(metricDAO, datasetDAO, ThirdEyeCacheRegistry.getInstance().getQueryCache(),
ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
- this.currentAndBaselineLoader = new CurrentAndBaselineLoader(metricDAO, datasetDAO, aggregationLoader);
}
private DetectionAlertConfigDTO loadDetectionAlertConfig(long detectionAlertConfigId) {
@@ -122,10 +120,6 @@ public class DetectionAlertTaskRunner implements TaskRunner {
result = alertSuppressor.run(result);
}
- // TODO: Cleanup currentAndBaselineLoader
- // In the new design, we have decided to move this function back to the detection pipeline.
- this.currentAndBaselineLoader.fillInCurrentAndBaselineValue(result.getAllAnomalies());
-
// Send out alert notifications (email and/or iris)
Set<DetectionAlertScheme> alertSchemes =
detAlertTaskFactory.loadAlertSchemes(alertConfig, taskContext.getThirdEyeAnomalyConfiguration(), result);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index 34cff3c..d4dabf1 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -245,22 +245,23 @@ public class MergeWrapper extends DetectionPipeline {
}
- protected MergedAnomalyResultDTO copyAnomalyInfo(MergedAnomalyResultDTO anomaly, MergedAnomalyResultDTO newAnomaly) {
- newAnomaly.setStartTime(anomaly.getStartTime());
- newAnomaly.setEndTime(anomaly.getEndTime());
- newAnomaly.setMetric(anomaly.getMetric());
- newAnomaly.setMetricUrn(anomaly.getMetricUrn());
- newAnomaly.setCollection(anomaly.getCollection());
- newAnomaly.setDimensions(anomaly.getDimensions());
- newAnomaly.setDetectionConfigId(anomaly.getDetectionConfigId());
- newAnomaly.setAnomalyResultSource(anomaly.getAnomalyResultSource());
- newAnomaly.setAvgBaselineVal(anomaly.getAvgBaselineVal());
- newAnomaly.setAvgCurrentVal(anomaly.getAvgCurrentVal());
- newAnomaly.setFeedback(anomaly.getFeedback());
- newAnomaly.setAnomalyFeedbackId(anomaly.getAnomalyFeedbackId());
- newAnomaly.setScore(anomaly.getScore());
- newAnomaly.setWeight(anomaly.getWeight());
- return newAnomaly;
+ protected MergedAnomalyResultDTO copyAnomalyInfo(MergedAnomalyResultDTO from, MergedAnomalyResultDTO to) {
+ to.setStartTime(from.getStartTime());
+ to.setEndTime(from.getEndTime());
+ to.setMetric(from.getMetric());
+ to.setMetricUrn(from.getMetricUrn());
+ to.setCollection(from.getCollection());
+ to.setDimensions(from.getDimensions());
+ to.setDetectionConfigId(from.getDetectionConfigId());
+ to.setAnomalyResultSource(from.getAnomalyResultSource());
+ to.setAvgBaselineVal(from.getAvgBaselineVal());
+ to.setAvgCurrentVal(from.getAvgCurrentVal());
+ to.setFeedback(from.getFeedback());
+ to.setAnomalyFeedbackId(from.getAnomalyFeedbackId());
+ to.setScore(from.getScore());
+ to.setWeight(from.getWeight());
+ to.setProperties(from.getProperties());
+ return to;
}
protected static class AnomalyKey {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index a1d7289..e07cf83 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -21,6 +21,7 @@ package org.apache.pinot.thirdeye.detection.wrapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
+import java.util.HashMap;
import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
import org.apache.pinot.thirdeye.dataframe.Series;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
@@ -63,11 +64,12 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
private static final String DEFAULT_WOW_BASELINE_PROVIDER_NAME = "DEFAULT_WOW";
- private BaselineProvider baselineValueProvider; // optionally configure a baseline value loader
- private BaselineProvider currentValueProvider;
- private String baselineProviderComponentName;
+ private final BaselineProvider baselineValueProvider; // optionally configure a baseline value loader
+ private final BaselineProvider currentValueProvider;
+ private final String baselineProviderComponentName;
private String detectorComponentName;
private String metricUrn;
+ private final Map<Long, MergedAnomalyResultDTO> existingAnomalies; // from id to anomalies
public BaselineFillingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
{
@@ -88,7 +90,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
}
if (config.getProperties().containsKey(PROP_CURRENT_PROVIDER)) {
- String detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), currentValueProvider));
+ String detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_CURRENT_PROVIDER));
Preconditions.checkArgument(this.config.getComponents().containsKey(detectorReferenceKey));
this.currentValueProvider = (BaselineProvider) this.config.getComponents().get(detectorReferenceKey);
} else {
@@ -117,6 +119,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
properties.put(PROP_METRIC_URN, nestedUrn);
}
}
+ this.existingAnomalies = new HashMap<>();
}
@Override
@@ -141,6 +144,9 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
// merge if only the anomaly is in the same dimension
this.metricUrn.equals(mergedAnomaly.getMetricUrn())
);
+ for (MergedAnomalyResultDTO anomaly : anomalies) {
+ if(anomaly.getId() != null) this.existingAnomalies.put(anomaly.getId(), copyAnomalyInfo(anomaly, new MergedAnomalyResultDTO()));
+ }
return new ArrayList<>(anomalies);
}
@@ -151,6 +157,10 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
*/
List<MergedAnomalyResultDTO> fillCurrentAndBaselineValue(List<MergedAnomalyResultDTO> mergedAnomalies) {
for (MergedAnomalyResultDTO anomaly : mergedAnomalies) {
+ // skip current & baseline filling if the anomaly is not merged
+ if (this.isExistingAnomaly(this.existingAnomalies, anomaly)) {
+ continue;
+ }
try {
String metricUrn = anomaly.getMetricUrn();
MetricEntity me = MetricEntity.fromURN(metricUrn);
@@ -181,6 +191,15 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
return mergedAnomalies;
}
+ // check if an anomaly is a existing anomaly and the duration is not modified
+ protected boolean isExistingAnomaly(Map<Long, MergedAnomalyResultDTO> existingAnomalies, MergedAnomalyResultDTO anomaly) {
+ if (!existingAnomalies.containsKey(anomaly.getId())) {
+ return false;
+ }
+ MergedAnomalyResultDTO previousAnomaly = existingAnomalies.get(anomaly.getId());
+ return anomaly.getStartTime() == previousAnomaly.getStartTime() && anomaly.getEndTime() == previousAnomaly.getEndTime();
+ }
+
private double calculateWeight(MergedAnomalyResultDTO anomaly) {
return (anomaly.getAvgCurrentVal() - anomaly.getAvgBaselineVal()) / anomaly.getAvgBaselineVal();
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index a6ab276..f3fa13c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -39,8 +39,7 @@ import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper;
* Merge anomalies regardless of anomaly merge key.
*/
public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
- public ChildKeepingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
- {
+ public ChildKeepingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
super(provider, config, startTime, endTime);
}
@@ -53,6 +52,13 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
@Override
protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) {
List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
+ Map<Long, MergedAnomalyResultDTO> existingParentAnomalies = new HashMap<>();
+ for (MergedAnomalyResultDTO anomaly : input) {
+ if (anomaly.getId() != null && !anomaly.getChildren().isEmpty()) {
+ existingParentAnomalies.put(anomaly.getId(), anomaly);
+ }
+ }
+
Collections.sort(input, MergeWrapper.COMPARATOR);
List<MergedAnomalyResultDTO> output = new ArrayList<>();
@@ -63,8 +69,8 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
continue;
}
- MergeWrapper.AnomalyKey
- key = new MergeWrapper.AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), "", "");
+ MergeWrapper.AnomalyKey key =
+ new MergeWrapper.AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), "", "");
MergedAnomalyResultDTO parent = parents.get(key);
if (parent == null || anomaly.getStartTime() - parent.getEndTime() > this.maxGap) {
@@ -74,7 +80,7 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
} else if (anomaly.getEndTime() <= parent.getEndTime()
|| anomaly.getEndTime() - parent.getStartTime() <= this.maxDuration) {
// fully merge into existing
- if (parent.getChildren().isEmpty()){
+ if (parent.getChildren().isEmpty()) {
parent.getChildren().add(copyAnomalyInfo(parent, new MergedAnomalyResultDTO()));
}
parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime()));
@@ -91,11 +97,11 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
}
}
- // refill current and baseline values for parent anomalies
- Collection<MergedAnomalyResultDTO> parentAnomalies =
- Collections2.filter(output, mergedAnomaly -> mergedAnomaly != null && !mergedAnomaly.getChildren().isEmpty());
+ // refill current and baseline values for qualified parent anomalies
+ Collection<MergedAnomalyResultDTO> parentAnomalies = Collections2.filter(output,
+ mergedAnomaly -> mergedAnomaly != null && !mergedAnomaly.getChildren().isEmpty() && !isExistingAnomaly(
+ existingParentAnomalies, mergedAnomaly));
super.fillCurrentAndBaselineValue(new ArrayList<>(parentAnomalies));
return output;
}
-
}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index 87f7698..77d1785 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -17,6 +17,11 @@
package org.apache.pinot.thirdeye.detection.wrapper;
import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.pinot.thirdeye.constant.MetricAggFunction;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
@@ -35,11 +40,6 @@ import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper;
import org.apache.pinot.thirdeye.detection.components.MockBaselineProvider;
import org.apache.pinot.thirdeye.detection.spec.MockBaselineProviderSpec;
import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@@ -60,7 +60,6 @@ public class BaselineFillingMergeWrapperTest {
private static final Long PROP_ID_VALUE = 1000L;
private static final String PROP_NAME_VALUE = "myName";
-
private static final String PROP_CLASS_NAME = "className";
private static final String PROP_METRIC_URN = "metricUrn";
private static final String PROP_PROPERTIES = "properties";
@@ -79,13 +78,8 @@ public class BaselineFillingMergeWrapperTest {
nestedPropertiesOne.put(PROP_CLASS_NAME, "none");
nestedPropertiesOne.put(PROP_METRIC_URN, "thirdeye:metric:1");
- Map<String, Object> nestedPropertiesTwo = new HashMap<>();
- nestedPropertiesTwo.put(PROP_CLASS_NAME, "none");
- nestedPropertiesTwo.put(PROP_METRIC_URN, "thirdeye:metric:2");
-
this.nestedProperties = new ArrayList<>();
this.nestedProperties.add(nestedPropertiesOne);
- this.nestedProperties.add(nestedPropertiesTwo);
this.properties.put(PROP_NESTED, this.nestedProperties);
@@ -97,34 +91,44 @@ public class BaselineFillingMergeWrapperTest {
@Test
public void testMergerCurrentAndBaselineLoading() throws Exception {
+ // mock anomaly
MergedAnomalyResultDTO anomaly = makeAnomaly(3000, 3600);
Map<String, String> anomalyProperties = new HashMap<>();
anomalyProperties.put("detectorComponentName", "testDetector");
anomaly.setProperties(anomalyProperties);
anomaly.setMetricUrn("thirdeye:metric:1");
- Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
- aggregates.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(-1, 100).build());
+ // mock time series
Map<MetricSlice, DataFrame> timeseries = new HashMap<>();
- timeseries.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build());
+ timeseries.put(MetricSlice.from(1, 3000, 3600),
+ DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build());
+ // mock metric
MetricConfigDTO metric = new MetricConfigDTO();
metric.setId(1L);
metric.setDefaultAggFunction(MetricAggFunction.SUM);
- DataProvider
- provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates)
- .setMetrics(Collections.singletonList(metric)).setTimeseries(timeseries);
+ DataProvider provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs,
+ Collections.singletonList(new MockPipelineOutput(Collections.singletonList(anomaly), -1L))))
+ .setAnomalies(Collections.emptyList())
+ .setMetrics(Collections.singletonList(metric))
+ .setTimeseries(timeseries);
+ // set up detection config properties
this.config.getProperties().put(PROP_MAX_GAP, 100);
this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
this.config.getProperties().put("detector", "$testDetector");
+
+ // initialize the baseline provider
BaselineProvider baselineProvider = new MockBaselineProvider();
MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
- spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), TimeSeries.fromDataFrame(DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build())));
+ spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), TimeSeries.fromDataFrame(
+ DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build())));
InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, this.config.getId());
baselineProvider.init(spec, dataFetcher);
this.config.setComponents(ImmutableMap.of("baseline", baselineProvider));
+
+ // run baseline filling merge wrapper
this.wrapper = new BaselineFillingMergeWrapper(provider, this.config, 2900, 3600);
DetectionPipelineResult output = this.wrapper.run();
@@ -137,4 +141,41 @@ public class BaselineFillingMergeWrapperTest {
Assert.assertEquals(anomalyResults.get(0).getProperties().get("baselineProviderComponentName"), "baseline");
}
-}
\ No newline at end of file
+ @Test
+ public void testMergerCurrentAndBaselineLoadingSkipExisting() throws Exception {
+ MergedAnomalyResultDTO anomaly = makeAnomaly(3000, 3600);
+ Map<String, String> anomalyProperties = new HashMap<>();
+ anomalyProperties.put("detectorComponentName", "testDetector");
+ anomalyProperties.put("baselineProviderComponentName", "someBaseline");
+ anomaly.setProperties(anomalyProperties);
+ anomaly.setMetricUrn("thirdeye:metric:1");
+ anomaly.setId(1000L);
+ anomaly.setAvgCurrentVal(999.0);
+ anomaly.setAvgBaselineVal(998.0);
+
+ DataProvider provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.emptyList()))
+ .setAnomalies(Collections.singletonList(anomaly));
+
+ this.config.getProperties().put(PROP_MAX_GAP, 100);
+ this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
+ this.config.getProperties().put("detector", "$testDetector");
+ BaselineProvider baselineProvider = new MockBaselineProvider();
+ MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
+ spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
+ spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), TimeSeries.fromDataFrame(
+ DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build())));
+ InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, this.config.getId());
+ baselineProvider.init(spec, dataFetcher);
+ this.config.setComponents(ImmutableMap.of("baseline", baselineProvider));
+ this.wrapper = new BaselineFillingMergeWrapper(provider, this.config, 2900, 3600);
+ DetectionPipelineResult output = this.wrapper.run();
+
+ List<MergedAnomalyResultDTO> anomalyResults = output.getAnomalies();
+ Assert.assertEquals(anomalyResults.size(), 1);
+ Assert.assertTrue(anomalyResults.contains(anomaly));
+ Assert.assertEquals(anomalyResults.get(0).getAvgBaselineVal(), 998.0);
+ Assert.assertEquals(anomalyResults.get(0).getAvgCurrentVal(), 999.0);
+ Assert.assertEquals(anomalyResults.get(0).getProperties().get("detectorComponentName"), "testDetector");
+ Assert.assertEquals(anomalyResults.get(0).getProperties().get("baselineProviderComponentName"), "someBaseline");
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
index 92a6918..a5a5bda 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
@@ -19,6 +19,7 @@ package org.apache.pinot.thirdeye.detection.wrapper;
import com.google.common.collect.ImmutableSet;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.detection.DataProvider;
import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
import org.apache.pinot.thirdeye.detection.MockDataProvider;
@@ -263,4 +264,41 @@ public class ChildKeepingMergeWrapperTest {
Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value"))));
Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", "otherValue"))));
}
+
+ @Test
+ public void testMergerCurrentAndBaselineFillingSkip() throws Exception {
+ this.config.getProperties().put(PROP_MAX_GAP, 0);
+ MergedAnomalyResultDTO anomaly = makeAnomaly(1100, 1200);
+ anomaly.setId(100L);
+ anomaly.setAvgBaselineVal(999.0);
+ anomaly.setAvgCurrentVal(998.0);
+ anomaly.setChildren(ImmutableSet.of(makeAnomaly(1000, 1050), makeAnomaly(1050, 1100)));
+
+ Map<String, Object> nestedPropertiesOne = new HashMap<>();
+ nestedPropertiesOne.put(PROP_CLASS_NAME, "none");
+ nestedPropertiesOne.put(PROP_METRIC_URN, "thirdeye:metric:1");
+
+ List<Map<String, Object>> nestedProperties = new ArrayList<>();
+ nestedProperties.add(nestedPropertiesOne);
+
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(PROP_NESTED, nestedProperties);
+
+ DetectionConfigDTO config = new DetectionConfigDTO();
+ config.setId(PROP_ID_VALUE);
+ config.setName(PROP_NAME_VALUE);
+ config.setProperties(properties);
+
+ MockPipelineLoader mockLoader = new MockPipelineLoader(this.runs, Collections.singletonList(new MockPipelineOutput(Collections.singletonList(anomaly), -1L)));
+
+ DataProvider provider = new MockDataProvider()
+ .setLoader(mockLoader);
+
+ DetectionPipelineResult output = new ChildKeepingMergeWrapper(provider, config, 1000, 3000).run();
+ List<MergedAnomalyResultDTO> anomalyResults = output.getAnomalies();
+ Assert.assertEquals(anomalyResults.size(), 1);
+ Assert.assertEquals(anomalyResults.get(0).getAvgBaselineVal(), 999.0);
+ Assert.assertEquals(anomalyResults.get(0).getAvgCurrentVal(), 998.0);
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org