You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/01/18 00:20:02 UTC

[incubator-pinot] branch master updated: [TE] detection - change last time stamp to handle data incompleteness (#3712)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 29a8e54  [TE] detection - change last time stamp to handle data incompleteness (#3712)
29a8e54 is described below

commit 29a8e540805a80e2ede13c8640659824343049e1
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Thu Jan 17 16:19:57 2019 -0800

    [TE] detection - change last time stamp to handle data incompleteness (#3712)
    
    This PR set the next detection timestamp based on the time series timestamps. If data is incomplete, next detection starts from the data's latest available timestamp plus the time granularity.
    This PR set the next detection timestamp based on the time series timestamps. If data is incomplete, next detection starts from the data's latest available timestamp plus the time granularity.
---
 .../pinot/thirdeye/detection/DetectionUtils.java   | 12 ++++++++
 .../detection/algorithm/DimensionWrapper.java      | 11 ++++---
 .../thirdeye/detection/algorithm/MergeWrapper.java |  5 ++-
 .../detection/wrapper/AnomalyDetectorWrapper.java  | 36 ++++++++++++++++++++--
 .../detection/wrapper/AnomalyFilterWrapper.java    |  6 +++-
 .../detection/algorithm/MergeWrapperTest.java      | 10 +++---
 .../components/ThresholdRuleAnomalyFilterTest.java |  4 +--
 .../MergeDimensionThresholdIntegrationTest.java    |  1 -
 .../wrapper/AnomalyDetectorWrapperTest.java        | 31 +++++++++++++++++++
 .../wrapper/ChildKeepingMergeWrapperTest.java      | 10 +++---
 10 files changed, 104 insertions(+), 22 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
index 72deae7..ef0b7f6 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
@@ -163,4 +163,16 @@ public class DetectionUtils {
 
     return anomaly;
   }
+
+  /**
+   * Helper for consolidate last time stamps in all nested detection pipelines
+   * @param nestedLastTimeStamps all nested last time stamps
+   * @return the last time stamp
+   */
+  public static long consolidateNestedLastTimeStamps(Collection<Long> nestedLastTimeStamps){
+    if(nestedLastTimeStamps.isEmpty() || nestedLastTimeStamps.contains(-1L)){
+      return -1L;
+    }
+    return Collections.min(nestedLastTimeStamps);
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
index b2c8968..d328574 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
@@ -22,6 +22,8 @@ package org.apache.pinot.thirdeye.detection.algorithm;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
@@ -30,6 +32,7 @@ import org.apache.pinot.thirdeye.detection.ConfigUtils;
 import org.apache.pinot.thirdeye.detection.DataProvider;
 import org.apache.pinot.thirdeye.detection.DetectionPipeline;
 import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
 import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -135,7 +138,7 @@ public class DimensionWrapper extends DetectionPipeline {
       DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions).get(slice);
 
       if (aggregates.isEmpty()) {
-        return new DetectionPipelineResult(Collections.<MergedAnomalyResultDTO>emptyList());
+        return new DetectionPipelineResult(Collections.<MergedAnomalyResultDTO>emptyList(), -1);
       }
 
       final double total = aggregates.getDoubles(COL_VALUE).sum().fillNull().doubleValue();
@@ -194,19 +197,19 @@ public class DimensionWrapper extends DetectionPipeline {
 
     List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
     Map<String, Object> diagnostics = new HashMap<>();
-
+    Set<Long> lastTimeStamps = new HashSet<>();
     LOG.info("exploring {} metrics", nestedMetrics.size());
     for (MetricEntity metric : nestedMetrics) {
       for (Map<String, Object> properties : this.nestedProperties) {
         LOG.info("running detection for {}", metric.toString());
         DetectionPipelineResult intermediate = this.runNested(metric, properties);
-
+        lastTimeStamps.add(intermediate.getLastTimestamp());
         anomalies.addAll(intermediate.getAnomalies());
         diagnostics.put(metric.getUrn(), intermediate.getDiagnostics());
       }
     }
 
-    return new DetectionPipelineResult(anomalies)
+    return new DetectionPipelineResult(anomalies, DetectionUtils.consolidateNestedLastTimeStamps(lastTimeStamps))
         .setDiagnostics(diagnostics);
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index 1a749d0..21a1874 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import org.apache.pinot.thirdeye.api.DimensionMap;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
 import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
 import org.apache.pinot.thirdeye.detection.ConfigUtils;
 import org.apache.pinot.thirdeye.detection.DataProvider;
@@ -101,6 +102,7 @@ public class MergeWrapper extends DetectionPipeline {
     List<MergedAnomalyResultDTO> generated = new ArrayList<>();
 
     int i = 0;
+    Set<Long> lastTimeStamps = new HashSet<>();
     for (Map<String, Object> properties : this.nestedProperties) {
       DetectionConfigDTO nestedConfig = new DetectionConfigDTO();
 
@@ -114,6 +116,7 @@ public class MergeWrapper extends DetectionPipeline {
       DetectionPipeline pipeline = this.provider.loadPipeline(nestedConfig, this.startTime, this.endTime);
 
       DetectionPipelineResult intermediate = pipeline.run();
+      lastTimeStamps.add(intermediate.getLastTimestamp());
 
       generated.addAll(intermediate.getAnomalies());
       diagnostics.put(String.valueOf(i), intermediate.getDiagnostics());
@@ -126,7 +129,7 @@ public class MergeWrapper extends DetectionPipeline {
     all.addAll(retrieveAnomaliesFromDatabase(generated));
     all.addAll(generated);
 
-    return new DetectionPipelineResult(this.merge(all)).setDiagnostics(diagnostics);
+    return new DetectionPipelineResult(this.merge(all), DetectionUtils.consolidateNestedLastTimeStamps(lastTimeStamps)).setDiagnostics(diagnostics);
   }
 
   protected List<MergedAnomalyResultDTO> retrieveAnomaliesFromDatabase(List<MergedAnomalyResultDTO> generated) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index b98d74d..71fa4a5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -24,6 +24,9 @@ import org.apache.pinot.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
 import org.apache.pinot.thirdeye.anomalydetection.context.AnomalyResult;
 import org.apache.pinot.thirdeye.api.TimeGranularity;
 import org.apache.pinot.thirdeye.api.TimeSpec;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
 import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
@@ -49,6 +52,8 @@ import org.joda.time.Period;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
 
 /**
  * Anomaly Detector Wrapper. This wrapper runs a anomaly detector and return the anomalies.
@@ -116,8 +121,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     Map<String, Object> frequency = MapUtils.getMap(config.getProperties(), PROP_FREQUENCY, Collections.emptyMap());
     this.functionFrequency = new TimeGranularity(MapUtils.getIntValue(frequency, "size", 15), TimeUnit.valueOf(MapUtils.getString(frequency, "unit", "MINUTES")));
 
-    MetricEntity me = MetricEntity.fromURN(this.metricUrn);
-    MetricConfigDTO metricConfigDTO = this.provider.fetchMetrics(Collections.singletonList(me.getId())).get(me.getId());
+    MetricConfigDTO metricConfigDTO = this.provider.fetchMetrics(Collections.singletonList(this.metricEntity.getId())).get(this.metricEntity.getId());
     this.dataset = this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
         .get(metricConfigDTO.getDataset());
     // date time zone for moving windows. use dataset time zone as default
@@ -147,7 +151,33 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
       anomaly.setDimensions(DetectionUtils.toFilterMap(this.metricEntity.getFilters()));
       anomaly.getProperties().put(PROP_DETECTOR_COMPONENT_NAME, this.detectorName);
     }
-    return new DetectionPipelineResult(anomalies);
+    return new DetectionPipelineResult(anomalies, this.getLastTimeStamp());
+  }
+
+  // guess-timate next time stamp
+  // there are two cases. If the data is complete, next detection starts from the end time of this detection
+  // If data is incomplete, next detection starts from the latest available data's time stamp plus the one time granularity.
+  long getLastTimeStamp(){
+    long end = this.endTime;
+    if (this.dataset != null) {
+      MetricSlice metricSlice = MetricSlice.from(this.metricEntity.getId(),
+          this.startTime,
+          this.endTime,
+          this.metricEntity.getFilters());
+      DoubleSeries timestamps = this.provider.fetchTimeseries(Collections.singleton(metricSlice)).get(metricSlice).getDoubles(COL_TIME);
+      if (timestamps.size() == 0) {
+        // no data available, don't update time stamp
+        return -1;
+      }
+      Period period = dataset.bucketTimeGranularity().toPeriod();
+      DateTimeZone timezone = DateTimeZone.forID(dataset.getTimezone());
+      long lastTimestamp = timestamps.getLong(timestamps.size() - 1);
+
+      end = new DateTime(lastTimestamp, timezone).plus(period).getMillis();
+    }
+
+    // truncate at analysis end time
+    return Math.min(end, this.endTime);
   }
 
   // get a list of the monitoring window, if no sliding window used, use start time and end time as window
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyFilterWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyFilterWrapper.java
index eb55167..7096d48 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyFilterWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyFilterWrapper.java
@@ -21,6 +21,8 @@ package org.apache.pinot.thirdeye.detection.wrapper;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Collections2;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.detection.ConfigUtils;
@@ -71,6 +73,7 @@ public class AnomalyFilterWrapper extends DetectionPipeline {
   @Override
   public final DetectionPipelineResult run() throws Exception {
     List<MergedAnomalyResultDTO> candidates = new ArrayList<>();
+    Set<Long> lastTimeStamps = new HashSet<>();
     for (Map<String, Object> properties : this.nestedProperties) {
       DetectionConfigDTO nestedConfig = new DetectionConfigDTO();
 
@@ -86,12 +89,13 @@ public class AnomalyFilterWrapper extends DetectionPipeline {
       DetectionPipeline pipeline = this.provider.loadPipeline(nestedConfig, this.startTime, this.endTime);
 
       DetectionPipelineResult intermediate = pipeline.run();
+      lastTimeStamps.add(intermediate.getLastTimestamp());
       candidates.addAll(intermediate.getAnomalies());
     }
 
     Collection<MergedAnomalyResultDTO> anomalies =
         Collections2.filter(candidates, mergedAnomaly -> mergedAnomaly != null && !mergedAnomaly.isChild() && anomalyFilter.isQualified(mergedAnomaly));
 
-    return new DetectionPipelineResult(new ArrayList<>(anomalies));
+    return new DetectionPipelineResult(new ArrayList<>(anomalies), DetectionUtils.consolidateNestedLastTimeStamps(lastTimeStamps));
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
index 48de02d..bffc3af 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapperTest.java
@@ -117,7 +117,7 @@ public class MergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 6);
-    Assert.assertEquals(output.getLastTimestamp(), 2800);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
   }
 
   @Test
@@ -128,7 +128,7 @@ public class MergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 3);
-    Assert.assertEquals(output.getLastTimestamp(), 2800);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2000)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2800)));
@@ -143,7 +143,7 @@ public class MergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 3);
-    Assert.assertEquals(output.getLastTimestamp(), 2800);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
@@ -169,7 +169,7 @@ public class MergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 4);
-    Assert.assertEquals(output.getLastTimestamp(), 3800);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650)));
@@ -196,7 +196,7 @@ public class MergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 4);
-    Assert.assertEquals(output.getLastTimestamp(), 3800);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 3650)));
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
index a6e44b8..de52d00 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleAnomalyFilterTest.java
@@ -141,7 +141,7 @@ public class ThresholdRuleAnomalyFilterTest {
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
-    Assert.assertEquals(result.getLastTimestamp(), 8);
+    Assert.assertEquals(result.getLastTimestamp(), 10);
     Assert.assertEquals(anomalies.size(), 3);
     Assert.assertEquals(anomalies.get(0), this.anomalies.get(0));
     Assert.assertEquals(anomalies.get(1), this.anomalies.get(1));
@@ -156,7 +156,7 @@ public class ThresholdRuleAnomalyFilterTest {
 
     DetectionPipelineResult result = this.thresholdRuleFilter.run();
     List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
-    Assert.assertEquals(result.getLastTimestamp(), 8);
+    Assert.assertEquals(result.getLastTimestamp(), 10);
     Assert.assertEquals(anomalies.size(), 2);
     Assert.assertEquals(anomalies.get(0), this.anomalies.get(1));
     Assert.assertEquals(anomalies.get(1), this.anomalies.get(2));
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
index a7a9ea1..ad490fa 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/integration/MergeDimensionThresholdIntegrationTest.java
@@ -130,7 +130,6 @@ public class MergeDimensionThresholdIntegrationTest {
     DetectionPipelineResult result = pipeline.run();
 
     Assert.assertEquals(result.getAnomalies().size(), 3);
-    Assert.assertEquals(result.getLastTimestamp(), 18000);
 
     Assert.assertTrue(result.getAnomalies().contains(makeAnomaly(9500, 18000, "thirdeye:metric:2")));
     Assert.assertTrue(result.getAnomalies().contains(makeAnomaly(0, 7200, "thirdeye:metric:2:a%3D1:b%3D2")));
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
index 1b230b8..ba51753 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
@@ -18,6 +18,8 @@ package org.apache.pinot.thirdeye.detection.wrapper;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.pinot.thirdeye.api.TimeSpec;
+import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
 import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
@@ -35,6 +37,8 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
 
 public class AnomalyDetectorWrapperTest {
   private static final String PROP_METRIC_URN = "metricUrn";
@@ -66,6 +70,13 @@ public class AnomalyDetectorWrapperTest {
     dataset.setTimeUnit(TimeUnit.DAYS);
     dataset.setTimeDuration(1);
     this.provider.setDatasets(Collections.singletonList(dataset));
+    this.provider.setTimeseries(ImmutableMap.of(
+        MetricSlice.from(1L, 1546646400000L, 1546732800000L),
+        new DataFrame().addSeries(COL_VALUE, 500, 1000).addSeries(COL_TIME, 1546646400000L, 1546732800000L),
+        MetricSlice.from(1L, 1546819200000L, 1546905600000L),
+        DataFrame.builder(COL_TIME, COL_VALUE).build(),
+        MetricSlice.from(1L, 1546300800000L, 1546560000000L),
+        new DataFrame().addSeries(COL_VALUE, 500, 1000).addSeries(COL_TIME, 1546300800000L, 1546387200000L)));
   }
 
   @Test
@@ -104,4 +115,24 @@ public class AnomalyDetectorWrapperTest {
             new Interval(1540252800000L, 1540339200000L, timeZone), new Interval(1540339200000L, 1540425600000L, timeZone)));
   }
 
+  @Test
+  public void testGetLastTimestampWithEstimate() {
+    AnomalyDetectorWrapper detectionPipeline =
+        new AnomalyDetectorWrapper(this.provider, this.config, 1546300800000L, 1546560000000L);
+    Assert.assertEquals(detectionPipeline.getLastTimeStamp(), 1546473600000L);
+  }
+
+  @Test
+  public void testGetLastTimestampTruncate() {
+    AnomalyDetectorWrapper detectionPipeline =
+        new AnomalyDetectorWrapper(this.provider, this.config, 1546646400000L, 1546732800000L);
+    Assert.assertEquals(detectionPipeline.getLastTimeStamp(), 1546732800000L);
+  }
+
+  @Test
+  public void testGetLastTimestampNoData() {
+    AnomalyDetectorWrapper detectionPipeline =
+        new AnomalyDetectorWrapper(this.provider, this.config, 1546819200000L, 1546905600000L);
+    Assert.assertEquals(detectionPipeline.getLastTimeStamp(), -1);
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
index 16fd246..b39b865 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/ChildKeepingMergeWrapperTest.java
@@ -112,7 +112,7 @@ public class ChildKeepingMergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 6);
-    Assert.assertEquals(output.getLastTimestamp(), 2800);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
   }
 
   @Test
@@ -123,7 +123,7 @@ public class ChildKeepingMergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 3);
-    Assert.assertEquals(output.getLastTimestamp(), 2800);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, ImmutableSet.of(makeAnomaly(1150, 1250), makeAnomaly(0, 1000), makeAnomaly(1100, 1200)))));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2000)));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2200, 2800, ImmutableSet.of(makeAnomaly(2200, 2300), makeAnomaly(2400, 2800)))));
@@ -138,7 +138,7 @@ public class ChildKeepingMergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 3);
-    Assert.assertEquals(output.getLastTimestamp(), 2800);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, ImmutableSet.of(makeAnomaly(1150, 1250), makeAnomaly(0, 1000), makeAnomaly(1100, 1200)))));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, ImmutableSet.of(makeAnomaly(2200, 2300), makeAnomaly(1500, 2000)))));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
@@ -164,7 +164,7 @@ public class ChildKeepingMergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 4);
-    Assert.assertEquals(output.getLastTimestamp(), 3800);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, ImmutableSet.of(makeAnomaly(1150, 1250), makeAnomaly(0, 1000), makeAnomaly(1100, 1200)))));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, ImmutableSet.of(makeAnomaly(2200, 2300), makeAnomaly(1500, 2000)))));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));
@@ -191,7 +191,7 @@ public class ChildKeepingMergeWrapperTest {
     DetectionPipelineResult output = this.wrapper.run();
 
     Assert.assertEquals(output.getAnomalies().size(), 4);
-    Assert.assertEquals(output.getLastTimestamp(), 3800);
+    Assert.assertEquals(output.getLastTimestamp(), 2900);
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(0, 1250, ImmutableSet.of(makeAnomaly(1150, 1250), makeAnomaly(0, 1000), makeAnomaly(1100, 1200)))));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(1500, 2300, ImmutableSet.of(makeAnomaly(2200, 2300), makeAnomaly(1500, 2000)))));
     Assert.assertTrue(output.getAnomalies().contains(makeAnomaly(2400, 2800)));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org