You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/03/25 22:42:40 UTC

[incubator-pinot] branch master updated: [TE] detection - current & baseline provider lazy evaluation (#4014)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1870c36  [TE] detection - current & baseline provider lazy evaluation (#4014)
1870c36 is described below

commit 1870c3655a47a10098cbaf07d72c88c715c5c55d
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Mar 25 15:42:35 2019 -0700

    [TE] detection - current & baseline provider lazy evaluation (#4014)
    
    Update the anomaly current & baseline value only when the anomaly is merged.
    Clean up current and baseline loader
---
 .../api/user/dashboard/UserDashboardResource.java  |  9 ---
 .../bao/jdbc/MergedAnomalyResultManagerImpl.java   |  6 +-
 .../detection/alert/DetectionAlertTaskRunner.java  |  6 --
 .../thirdeye/detection/algorithm/MergeWrapper.java | 33 ++++-----
 .../wrapper/BaselineFillingMergeWrapper.java       | 27 ++++++--
 .../wrapper/ChildKeepingMergeWrapper.java          | 24 ++++---
 .../wrapper/BaselineFillingMergeWrapperTest.java   | 79 ++++++++++++++++------
 .../wrapper/ChildKeepingMergeWrapperTest.java      | 38 +++++++++++
 8 files changed, 155 insertions(+), 67 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
index e2d7fc8..6bf84fc 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/api/user/dashboard/UserDashboardResource.java
@@ -78,8 +78,6 @@ public class UserDashboardResource {
   private final DatasetConfigManager datasetDAO;
   private final DetectionConfigManager detectionDAO;
   private final DetectionAlertConfigManager detectionAlertDAO;
-  private final AggregationLoader aggregationLoader;
-  private final CurrentAndBaselineLoader currentAndBaselineLoader;
 
 
   public UserDashboardResource(MergedAnomalyResultManager anomalyDAO, MetricConfigManager metricDAO,
@@ -90,11 +88,6 @@ public class UserDashboardResource {
     this.datasetDAO = datasetDAO;
     this.detectionDAO = detectionDAO;
     this.detectionAlertDAO = detectionAlertDAO;
-
-    this.aggregationLoader =
-        new DefaultAggregationLoader(this.metricDAO, this.datasetDAO, ThirdEyeCacheRegistry.getInstance().getQueryCache(),
-            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
-    this.currentAndBaselineLoader = new CurrentAndBaselineLoader(this.metricDAO, this.datasetDAO, this.aggregationLoader);
   }
 
   /**
@@ -335,8 +328,6 @@ public class UserDashboardResource {
       }
     });
 
-    this.currentAndBaselineLoader.fillInCurrentAndBaselineValue(anomalies);
-
     return anomalies;
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
index 7bbe9e6..3940a2e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/MergedAnomalyResultManagerImpl.java
@@ -414,11 +414,9 @@ public class MergedAnomalyResultManagerImpl extends AbstractManagerImpl<MergedAn
           if (child.getChildren() != null && !child.getChildren().isEmpty()) {
             throw new IllegalArgumentException("Multi-level anomaly nesting not supported");
           }
-
-          child.setChild(true);
-          save(child);
         }
-
+        child.setChild(true);
+        save(child);
         childIds.add(child.getId());
       }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index 7e6bd71..20d9d2b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -52,7 +52,6 @@ public class DetectionAlertTaskRunner implements TaskRunner {
   private static final Logger LOG = LoggerFactory.getLogger(DetectionAlertTaskRunner.class);
 
   private final DetectionAlertTaskFactory detAlertTaskFactory;
-  private CurrentAndBaselineLoader currentAndBaselineLoader;
   private DetectionAlertConfigManager alertConfigDAO;
   private MergedAnomalyResultManager mergedAnomalyDAO;
 
@@ -66,7 +65,6 @@ public class DetectionAlertTaskRunner implements TaskRunner {
     AggregationLoader aggregationLoader =
         new DefaultAggregationLoader(metricDAO, datasetDAO, ThirdEyeCacheRegistry.getInstance().getQueryCache(),
             ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
-    this.currentAndBaselineLoader = new CurrentAndBaselineLoader(metricDAO, datasetDAO, aggregationLoader);
   }
 
   private DetectionAlertConfigDTO loadDetectionAlertConfig(long detectionAlertConfigId) {
@@ -122,10 +120,6 @@ public class DetectionAlertTaskRunner implements TaskRunner {
         result = alertSuppressor.run(result);
       }
 
-      // TODO: Cleanup currentAndBaselineLoader
-      // In the new design, we have decided to move this function back to the detection pipeline.
-      this.currentAndBaselineLoader.fillInCurrentAndBaselineValue(result.getAllAnomalies());
-
       // Send out alert notifications (email and/or iris)
       Set<DetectionAlertScheme> alertSchemes =
           detAlertTaskFactory.loadAlertSchemes(alertConfig, taskContext.getThirdEyeAnomalyConfiguration(), result);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index 34cff3c..d4dabf1 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -245,22 +245,23 @@ public class MergeWrapper extends DetectionPipeline {
   }
 
 
-  protected MergedAnomalyResultDTO copyAnomalyInfo(MergedAnomalyResultDTO anomaly, MergedAnomalyResultDTO newAnomaly) {
-    newAnomaly.setStartTime(anomaly.getStartTime());
-    newAnomaly.setEndTime(anomaly.getEndTime());
-    newAnomaly.setMetric(anomaly.getMetric());
-    newAnomaly.setMetricUrn(anomaly.getMetricUrn());
-    newAnomaly.setCollection(anomaly.getCollection());
-    newAnomaly.setDimensions(anomaly.getDimensions());
-    newAnomaly.setDetectionConfigId(anomaly.getDetectionConfigId());
-    newAnomaly.setAnomalyResultSource(anomaly.getAnomalyResultSource());
-    newAnomaly.setAvgBaselineVal(anomaly.getAvgBaselineVal());
-    newAnomaly.setAvgCurrentVal(anomaly.getAvgCurrentVal());
-    newAnomaly.setFeedback(anomaly.getFeedback());
-    newAnomaly.setAnomalyFeedbackId(anomaly.getAnomalyFeedbackId());
-    newAnomaly.setScore(anomaly.getScore());
-    newAnomaly.setWeight(anomaly.getWeight());
-    return newAnomaly;
+  protected MergedAnomalyResultDTO copyAnomalyInfo(MergedAnomalyResultDTO from, MergedAnomalyResultDTO to) {
+    to.setStartTime(from.getStartTime());
+    to.setEndTime(from.getEndTime());
+    to.setMetric(from.getMetric());
+    to.setMetricUrn(from.getMetricUrn());
+    to.setCollection(from.getCollection());
+    to.setDimensions(from.getDimensions());
+    to.setDetectionConfigId(from.getDetectionConfigId());
+    to.setAnomalyResultSource(from.getAnomalyResultSource());
+    to.setAvgBaselineVal(from.getAvgBaselineVal());
+    to.setAvgCurrentVal(from.getAvgCurrentVal());
+    to.setFeedback(from.getFeedback());
+    to.setAnomalyFeedbackId(from.getAnomalyFeedbackId());
+    to.setScore(from.getScore());
+    to.setWeight(from.getWeight());
+    to.setProperties(from.getProperties());
+    return to;
   }
 
   protected static class AnomalyKey {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
index a1d7289..e07cf83 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapper.java
@@ -21,6 +21,7 @@ package org.apache.pinot.thirdeye.detection.wrapper;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Collections2;
+import java.util.HashMap;
 import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
 import org.apache.pinot.thirdeye.dataframe.Series;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
@@ -63,11 +64,12 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
   private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
   private static final String DEFAULT_WOW_BASELINE_PROVIDER_NAME = "DEFAULT_WOW";
 
-  private BaselineProvider baselineValueProvider; // optionally configure a baseline value loader
-  private BaselineProvider currentValueProvider;
-  private String baselineProviderComponentName;
+  private final BaselineProvider baselineValueProvider; // optionally configure a baseline value loader
+  private final BaselineProvider currentValueProvider;
+  private final String baselineProviderComponentName;
   private String detectorComponentName;
   private String metricUrn;
+  private final Map<Long, MergedAnomalyResultDTO> existingAnomalies; // from id to anomalies
 
   public BaselineFillingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
   {
@@ -88,7 +90,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
     }
 
     if (config.getProperties().containsKey(PROP_CURRENT_PROVIDER)) {
-      String detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), currentValueProvider));
+      String detectorReferenceKey = DetectionUtils.getComponentName(MapUtils.getString(config.getProperties(), PROP_CURRENT_PROVIDER));
       Preconditions.checkArgument(this.config.getComponents().containsKey(detectorReferenceKey));
       this.currentValueProvider = (BaselineProvider) this.config.getComponents().get(detectorReferenceKey);
     } else {
@@ -117,6 +119,7 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
         properties.put(PROP_METRIC_URN, nestedUrn);
       }
     }
+    this.existingAnomalies = new HashMap<>();
   }
 
   @Override
@@ -141,6 +144,9 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
                     // merge if only the anomaly is in the same dimension
                 this.metricUrn.equals(mergedAnomaly.getMetricUrn())
         );
+    for (MergedAnomalyResultDTO anomaly : anomalies) {
+      if(anomaly.getId() != null) this.existingAnomalies.put(anomaly.getId(), copyAnomalyInfo(anomaly, new MergedAnomalyResultDTO()));
+    }
     return new ArrayList<>(anomalies);
   }
 
@@ -151,6 +157,10 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
    */
   List<MergedAnomalyResultDTO> fillCurrentAndBaselineValue(List<MergedAnomalyResultDTO> mergedAnomalies) {
     for (MergedAnomalyResultDTO anomaly : mergedAnomalies) {
+      // skip current & baseline filling if the anomaly is not merged
+      if (this.isExistingAnomaly(this.existingAnomalies, anomaly)) {
+        continue;
+      }
       try {
         String metricUrn = anomaly.getMetricUrn();
         MetricEntity me = MetricEntity.fromURN(metricUrn);
@@ -181,6 +191,15 @@ public class BaselineFillingMergeWrapper extends MergeWrapper {
     return mergedAnomalies;
   }
 
+  // check if an anomaly is a existing anomaly and the duration is not modified
+  protected boolean isExistingAnomaly(Map<Long, MergedAnomalyResultDTO> existingAnomalies, MergedAnomalyResultDTO anomaly) {
+    if (!existingAnomalies.containsKey(anomaly.getId())) {
+      return false;
+    }
+    MergedAnomalyResultDTO previousAnomaly = existingAnomalies.get(anomaly.getId());
+    return anomaly.getStartTime() == previousAnomaly.getStartTime() && anomaly.getEndTime() == previousAnomaly.getEndTime();
+  }
+
   private double calculateWeight(MergedAnomalyResultDTO anomaly) {
     return (anomaly.getAvgCurrentVal() - anomaly.getAvgBaselineVal()) / anomaly.getAvgBaselineVal();
   }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
index a6ab276..f3fa13c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapper.java
@@ -39,8 +39,7 @@ import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper;
  * Merge anomalies regardless of anomaly merge key.
  */
 public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
-  public ChildKeepingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime)
-  {
+  public ChildKeepingMergeWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
     super(provider, config, startTime, endTime);
   }
 
@@ -53,6 +52,13 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
   @Override
   protected List<MergedAnomalyResultDTO> merge(Collection<MergedAnomalyResultDTO> anomalies) {
     List<MergedAnomalyResultDTO> input = new ArrayList<>(anomalies);
+    Map<Long, MergedAnomalyResultDTO> existingParentAnomalies = new HashMap<>();
+    for (MergedAnomalyResultDTO anomaly : input) {
+      if (anomaly.getId() != null && !anomaly.getChildren().isEmpty()) {
+        existingParentAnomalies.put(anomaly.getId(), anomaly);
+      }
+    }
+
     Collections.sort(input, MergeWrapper.COMPARATOR);
 
     List<MergedAnomalyResultDTO> output = new ArrayList<>();
@@ -63,8 +69,8 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
         continue;
       }
 
-      MergeWrapper.AnomalyKey
-          key = new MergeWrapper.AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), "", "");
+      MergeWrapper.AnomalyKey key =
+          new MergeWrapper.AnomalyKey(anomaly.getMetric(), anomaly.getCollection(), anomaly.getDimensions(), "", "");
       MergedAnomalyResultDTO parent = parents.get(key);
 
       if (parent == null || anomaly.getStartTime() - parent.getEndTime() > this.maxGap) {
@@ -74,7 +80,7 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
       } else if (anomaly.getEndTime() <= parent.getEndTime()
           || anomaly.getEndTime() - parent.getStartTime() <= this.maxDuration) {
         // fully merge into existing
-        if (parent.getChildren().isEmpty()){
+        if (parent.getChildren().isEmpty()) {
           parent.getChildren().add(copyAnomalyInfo(parent, new MergedAnomalyResultDTO()));
         }
         parent.setEndTime(Math.max(parent.getEndTime(), anomaly.getEndTime()));
@@ -91,11 +97,11 @@ public class ChildKeepingMergeWrapper extends BaselineFillingMergeWrapper {
       }
     }
 
-    // refill current and baseline values for parent anomalies
-    Collection<MergedAnomalyResultDTO> parentAnomalies =
-        Collections2.filter(output, mergedAnomaly -> mergedAnomaly != null && !mergedAnomaly.getChildren().isEmpty());
+    // refill current and baseline values for qualified parent anomalies
+    Collection<MergedAnomalyResultDTO> parentAnomalies = Collections2.filter(output,
+        mergedAnomaly -> mergedAnomaly != null && !mergedAnomaly.getChildren().isEmpty() && !isExistingAnomaly(
+            existingParentAnomalies, mergedAnomaly));
     super.fillCurrentAndBaselineValue(new ArrayList<>(parentAnomalies));
     return output;
   }
-
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index 87f7698..77d1785 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -17,6 +17,11 @@
 package org.apache.pinot.thirdeye.detection.wrapper;
 
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.pinot.thirdeye.constant.MetricAggFunction;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
@@ -35,11 +40,6 @@ import org.apache.pinot.thirdeye.detection.algorithm.MergeWrapper;
 import org.apache.pinot.thirdeye.detection.components.MockBaselineProvider;
 import org.apache.pinot.thirdeye.detection.spec.MockBaselineProviderSpec;
 import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -60,7 +60,6 @@ public class BaselineFillingMergeWrapperTest {
 
   private static final Long PROP_ID_VALUE = 1000L;
   private static final String PROP_NAME_VALUE = "myName";
-
   private static final String PROP_CLASS_NAME = "className";
   private static final String PROP_METRIC_URN = "metricUrn";
   private static final String PROP_PROPERTIES = "properties";
@@ -79,13 +78,8 @@ public class BaselineFillingMergeWrapperTest {
     nestedPropertiesOne.put(PROP_CLASS_NAME, "none");
     nestedPropertiesOne.put(PROP_METRIC_URN, "thirdeye:metric:1");
 
-    Map<String, Object> nestedPropertiesTwo = new HashMap<>();
-    nestedPropertiesTwo.put(PROP_CLASS_NAME, "none");
-    nestedPropertiesTwo.put(PROP_METRIC_URN, "thirdeye:metric:2");
-
     this.nestedProperties = new ArrayList<>();
     this.nestedProperties.add(nestedPropertiesOne);
-    this.nestedProperties.add(nestedPropertiesTwo);
 
     this.properties.put(PROP_NESTED, this.nestedProperties);
 
@@ -97,34 +91,44 @@ public class BaselineFillingMergeWrapperTest {
 
   @Test
   public void testMergerCurrentAndBaselineLoading() throws Exception {
+    // mock anomaly
     MergedAnomalyResultDTO anomaly = makeAnomaly(3000, 3600);
     Map<String, String> anomalyProperties = new HashMap<>();
     anomalyProperties.put("detectorComponentName", "testDetector");
     anomaly.setProperties(anomalyProperties);
     anomaly.setMetricUrn("thirdeye:metric:1");
 
-    Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
-    aggregates.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(-1, 100).build());
+    // mock time series
     Map<MetricSlice, DataFrame> timeseries = new HashMap<>();
-    timeseries.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build());
+    timeseries.put(MetricSlice.from(1, 3000, 3600),
+        DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build());
 
+    // mock metric
     MetricConfigDTO metric = new MetricConfigDTO();
     metric.setId(1L);
     metric.setDefaultAggFunction(MetricAggFunction.SUM);
-    DataProvider
-        provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates)
-        .setMetrics(Collections.singletonList(metric)).setTimeseries(timeseries);
+    DataProvider provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs,
+        Collections.singletonList(new MockPipelineOutput(Collections.singletonList(anomaly), -1L))))
+        .setAnomalies(Collections.emptyList())
+        .setMetrics(Collections.singletonList(metric))
+        .setTimeseries(timeseries);
 
+    // set up detection config properties
     this.config.getProperties().put(PROP_MAX_GAP, 100);
     this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
     this.config.getProperties().put("detector", "$testDetector");
+
+    // initialize the baseline provider
     BaselineProvider baselineProvider = new MockBaselineProvider();
     MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
     spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
-    spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), TimeSeries.fromDataFrame(DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build())));
+    spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), TimeSeries.fromDataFrame(
+        DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build())));
     InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, this.config.getId());
     baselineProvider.init(spec, dataFetcher);
     this.config.setComponents(ImmutableMap.of("baseline", baselineProvider));
+
+    // run baseline filling merge wrapper
     this.wrapper = new BaselineFillingMergeWrapper(provider, this.config, 2900, 3600);
     DetectionPipelineResult output = this.wrapper.run();
 
@@ -137,4 +141,41 @@ public class BaselineFillingMergeWrapperTest {
     Assert.assertEquals(anomalyResults.get(0).getProperties().get("baselineProviderComponentName"), "baseline");
   }
 
-}
\ No newline at end of file
+  @Test
+  public void testMergerCurrentAndBaselineLoadingSkipExisting() throws Exception {
+    MergedAnomalyResultDTO anomaly = makeAnomaly(3000, 3600);
+    Map<String, String> anomalyProperties = new HashMap<>();
+    anomalyProperties.put("detectorComponentName", "testDetector");
+    anomalyProperties.put("baselineProviderComponentName", "someBaseline");
+    anomaly.setProperties(anomalyProperties);
+    anomaly.setMetricUrn("thirdeye:metric:1");
+    anomaly.setId(1000L);
+    anomaly.setAvgCurrentVal(999.0);
+    anomaly.setAvgBaselineVal(998.0);
+
+    DataProvider provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.emptyList()))
+        .setAnomalies(Collections.singletonList(anomaly));
+
+    this.config.getProperties().put(PROP_MAX_GAP, 100);
+    this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
+    this.config.getProperties().put("detector", "$testDetector");
+    BaselineProvider baselineProvider = new MockBaselineProvider();
+    MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
+    spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
+    spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), TimeSeries.fromDataFrame(
+        DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build())));
+    InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, this.config.getId());
+    baselineProvider.init(spec, dataFetcher);
+    this.config.setComponents(ImmutableMap.of("baseline", baselineProvider));
+    this.wrapper = new BaselineFillingMergeWrapper(provider, this.config, 2900, 3600);
+    DetectionPipelineResult output = this.wrapper.run();
+
+    List<MergedAnomalyResultDTO> anomalyResults = output.getAnomalies();
+    Assert.assertEquals(anomalyResults.size(), 1);
+    Assert.assertTrue(anomalyResults.contains(anomaly));
+    Assert.assertEquals(anomalyResults.get(0).getAvgBaselineVal(), 998.0);
+    Assert.assertEquals(anomalyResults.get(0).getAvgCurrentVal(), 999.0);
+    Assert.assertEquals(anomalyResults.get(0).getProperties().get("detectorComponentName"), "testDetector");
+    Assert.assertEquals(anomalyResults.get(0).getProperties().get("baselineProviderComponentName"), "someBaseline");
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
index 92a6918..a5a5bda 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
@@ -19,6 +19,7 @@ package org.apache.pinot.thirdeye.detection.wrapper;
 import com.google.common.collect.ImmutableSet;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.detection.DataProvider;
 import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
 import org.apache.pinot.thirdeye.detection.MockDataProvider;
@@ -263,4 +264,41 @@ public class ChildKeepingMergeWrapperTest {
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800, Collections.singletonMap("otherKey", "value"))));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2700, 2900, Collections.singletonMap("otherKey", "otherValue"))));
   }
+
+  @Test
+  public void testMergerCurrentAndBaselineFillingSkip() throws Exception {
+    this.config.getProperties().put(PROP_MAX_GAP, 0);
+    MergedAnomalyResultDTO anomaly = makeAnomaly(1100, 1200);
+    anomaly.setId(100L);
+    anomaly.setAvgBaselineVal(999.0);
+    anomaly.setAvgCurrentVal(998.0);
+    anomaly.setChildren(ImmutableSet.of(makeAnomaly(1000, 1050), makeAnomaly(1050, 1100)));
+
+    Map<String, Object> nestedPropertiesOne = new HashMap<>();
+    nestedPropertiesOne.put(PROP_CLASS_NAME, "none");
+    nestedPropertiesOne.put(PROP_METRIC_URN, "thirdeye:metric:1");
+
+    List<Map<String, Object>> nestedProperties = new ArrayList<>();
+    nestedProperties.add(nestedPropertiesOne);
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(PROP_NESTED, nestedProperties);
+
+    DetectionConfigDTO config = new DetectionConfigDTO();
+    config.setId(PROP_ID_VALUE);
+    config.setName(PROP_NAME_VALUE);
+    config.setProperties(properties);
+
+    MockPipelineLoader mockLoader = new MockPipelineLoader(this.runs, Collections.singletonList(new MockPipelineOutput(Collections.singletonList(anomaly), -1L)));
+
+    DataProvider provider = new MockDataProvider()
+        .setLoader(mockLoader);
+
+    DetectionPipelineResult output = new ChildKeepingMergeWrapper(provider, config, 1000, 3000).run();
+    List<MergedAnomalyResultDTO> anomalyResults = output.getAnomalies();
+    Assert.assertEquals(anomalyResults.size(), 1);
+    Assert.assertEquals(anomalyResults.get(0).getAvgBaselineVal(), 999.0);
+    Assert.assertEquals(anomalyResults.get(0).getAvgCurrentVal(), 998.0);
+
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org