You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/02/12 01:37:11 UTC

[incubator-pinot] branch master updated: [TE] detection - caching & configure time granularity (#3810)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 017e9e5  [TE] detection - caching & configure time granularity (#3810)
017e9e5 is described below

commit 017e9e555e2db0ec0334d7c3977daef470504298
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Feb 11 17:37:04 2019 -0800

    [TE] detection - caching & configure time granularity (#3810)
    
    * Add the ability to configure time granularity in YAML
    * Reduce the number of time querying pinot in detection by catching time series
---
 .../pinot/thirdeye/dataframe/util/MetricSlice.java | 10 ++-
 .../pinot/thirdeye/detection/DataProvider.java     | 14 +++-
 .../thirdeye/detection/DefaultDataProvider.java    | 83 ++++++++++++++++------
 .../detection/DetectionMigrationResource.java      | 31 ++++----
 .../thirdeye/detection/algorithm/MergeWrapper.java | 19 +++--
 .../detection/components/RuleBaselineProvider.java | 13 ----
 .../detection/spi/components/BaselineProvider.java |  8 ++-
 .../detection/wrapper/AnomalyDetectorWrapper.java  | 52 +++++++++++---
 .../yaml/CompositePipelineConfigTranslator.java    |  6 +-
 .../yaml/YamlDetectionConfigTranslator.java        |  1 -
 .../thirdeye/detection/yaml/YamlResource.java      |  1 -
 .../pinot/thirdeye/detection/DataProviderTest.java | 15 ++++
 .../components/RuleBaselineProviderTest.java       |  2 +-
 .../wrapper/BaselineFillingMergeWrapperTest.java   |  6 +-
 14 files changed, 183 insertions(+), 78 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/MetricSlice.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/MetricSlice.java
index 5a04070..7039ff8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/MetricSlice.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/MetricSlice.java
@@ -21,11 +21,11 @@ package org.apache.pinot.thirdeye.dataframe.util;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
-import org.apache.pinot.thirdeye.common.time.TimeGranularity;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
 
 
 /**
@@ -97,6 +97,14 @@ public final class MetricSlice {
     return new MetricSlice(metricId, start, end, filters, granularity);
   }
 
+  /**
+   * check if current metric slice contains another metric slice
+   */
+  public boolean containSlice(MetricSlice slice) {
+    return slice.metricId == this.metricId && slice.granularity.equals(this.granularity) && slice.getFilters().equals(this.getFilters()) &&
+        slice.start >= this.start && slice.end <= this.end;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
index 4c714ba..9175cb9 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
@@ -20,6 +20,9 @@
 package org.apache.pinot.thirdeye.detection;
 
 import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
 import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
@@ -29,9 +32,6 @@ import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
 import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
 import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
 
 
 /**
@@ -55,6 +55,14 @@ public interface DataProvider {
   Map<MetricSlice, DataFrame> fetchTimeseries(Collection<MetricSlice> slices);
 
   /**
+   * Caches the time series for the metric slices for later use
+   * @param slices
+   */
+  default void cacheTimeseries(Collection<MetricSlice> slices){
+    // do nothing if not implemented
+  }
+
+  /**
    * Returns a map of aggregation values (keyed by slice) for a given set of slices,
    * grouped by the given dimensions.
    * The format of the DataFrame follows the standard convention of DataFrameUtils.
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
index f43febe..0b22629 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
@@ -19,9 +19,24 @@
 
 package org.apache.pinot.thirdeye.detection;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
 import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
@@ -36,23 +51,13 @@ import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.util.Predicate;
 import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
 import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
-import org.apache.pinot.thirdeye.detection.alert.StatefulDetectionAlertFilter;
 import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
 import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
 
 public class DefaultDataProvider implements DataProvider {
   private static final Logger LOG = LoggerFactory.getLogger(DefaultDataProvider.class);
@@ -67,6 +72,7 @@ public class DefaultDataProvider implements DataProvider {
   private final TimeSeriesLoader timeseriesLoader;
   private final AggregationLoader aggregationLoader;
   private final DetectionPipelineLoader loader;
+  private final LoadingCache<MetricSlice, DataFrame> timeseriesCache;
 
   public DefaultDataProvider(MetricConfigManager metricDAO, DatasetConfigManager datasetDAO, EventManager eventDAO,
       MergedAnomalyResultManager anomalyDAO, TimeSeriesLoader timeseriesLoader, AggregationLoader aggregationLoader,
@@ -78,26 +84,50 @@ public class DefaultDataProvider implements DataProvider {
     this.timeseriesLoader = timeseriesLoader;
     this.aggregationLoader = aggregationLoader;
     this.loader = loader;
+    this.timeseriesCache = CacheBuilder.newBuilder()
+        .maximumSize(100)
+        .expireAfterWrite(30, TimeUnit.MINUTES)
+        .build(new CacheLoader<MetricSlice, DataFrame>() {
+          @Override
+          public DataFrame load(MetricSlice slice) {
+            return fetchTimeseries(Collections.singleton(slice)).get(slice);
+          }
+        });
   }
 
+
   @Override
   public Map<MetricSlice, DataFrame> fetchTimeseries(Collection<MetricSlice> slices) {
     try {
-      Map<MetricSlice, Future<DataFrame>> futures = new HashMap<>();
-      for (final MetricSlice slice : slices) {
-        futures.put(slice, this.executor.submit(new Callable<DataFrame>() {
-          @Override
-          public DataFrame call() throws Exception {
-            return DefaultDataProvider.this.timeseriesLoader.load(slice);
+      long ts = System.currentTimeMillis();
+      Map<MetricSlice, DataFrame> output = new HashMap<>();
+
+      // if already in cache, return directly
+      for (MetricSlice slice : slices){
+        for (Map.Entry<MetricSlice, DataFrame> entry : this.timeseriesCache.asMap().entrySet()) {
+          if (entry.getKey().containSlice(slice)){
+            DataFrame df = entry.getValue().filter(entry.getValue().getLongs(COL_TIME).between(slice.getStart(), slice.getEnd())).dropNull(COL_TIME);
+            output.put(slice, df);
+            break;
           }
-        }));
+        }
       }
 
+      // if not in cache, fetch from data source
+      Map<MetricSlice, Future<DataFrame>> futures = new HashMap<>();
+      for (final MetricSlice slice : slices) {
+        if (!output.containsKey(slice)){
+          futures.put(slice, this.executor.submit(() -> DefaultDataProvider.this.timeseriesLoader.load(slice)));
+        }
+      }
+      LOG.info("Fetching {} slices of timeseries, {} cache hit, {} cache miss", slices.size(), output.size(), futures.size());
       final long deadline = System.currentTimeMillis() + TIMEOUT;
-      Map<MetricSlice, DataFrame> output = new HashMap<>();
       for (MetricSlice slice : slices) {
-        output.put(slice, futures.get(slice).get(makeTimeout(deadline), TimeUnit.MILLISECONDS));
+        if (!output.containsKey(slice)) {
+          output.put(slice, futures.get(slice).get(makeTimeout(deadline), TimeUnit.MILLISECONDS));
+        }
       }
+      LOG.info("Fetching {} slices used {} milliseconds", slices.size(), System.currentTimeMillis() - ts);
       return output;
 
     } catch (Exception e) {
@@ -106,6 +136,17 @@ public class DefaultDataProvider implements DataProvider {
   }
 
   @Override
+  public void cacheTimeseries(Collection<MetricSlice> slices) {
+    for (MetricSlice slice : slices){
+      try {
+        this.timeseriesCache.get(slice);
+      } catch (Exception e){
+        LOG.warn("cache time series for slice {} failed", slice.toString());
+      }
+    }
+  }
+
+  @Override
   public Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, final List<String> dimensions) {
     try {
       Map<MetricSlice, Future<DataFrame>> futures = new HashMap<>();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
index 1e041cc..ebed49c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
@@ -21,7 +21,8 @@ package org.apache.pinot.thirdeye.detection;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,6 +42,7 @@ import javax.ws.rs.core.Response;
 import javax.xml.bind.ValidationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.thirdeye.anomaly.detection.AnomalyDetectionInputContextBuilder;
+import org.apache.pinot.thirdeye.api.Constants;
 import org.apache.pinot.thirdeye.datalayer.bao.AlertConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.AnomalyFunctionManager;
 import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
@@ -56,10 +58,10 @@ import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.util.Predicate;
-import org.apache.pinot.thirdeye.datasource.DAORegistry;
 import org.apache.pinot.thirdeye.detection.yaml.YamlDetectionAlertConfigTranslator;
 import org.apache.pinot.thirdeye.detection.yaml.YamlResource;
 import org.joda.time.Period;
+import org.omg.CORBA.OBJ_ADAPTER;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.yaml.snakeyaml.DumperOptions;
@@ -73,6 +75,7 @@ import static org.apache.pinot.thirdeye.detection.yaml.YamlDetectionAlertConfigT
  * The Detection migration resource.
  */
 @Path("/migrate")
+@Api(tags = {Constants.YAML_TAG})
 public class DetectionMigrationResource {
   private static final Logger LOGGER = LoggerFactory.getLogger(DetectionMigrationResource.class);
   private static final String PROP_WINDOW_DELAY = "windowDelay";
@@ -146,10 +149,19 @@ public class DetectionMigrationResource {
           "params", getMinMaxThresholdRuleDetectorParams(anomalyFunctionDTO))));
     } else{
       // algorithm detector
-      ruleYaml.put("detection", Collections.singletonList(
-          ImmutableMap.of("name", "detection_rule1", "type", "MIGRATED_ALGORITHM", "params", getAlgorithmDetectorParams(anomalyFunctionDTO),
-              PROP_WINDOW_SIZE, anomalyFunctionDTO.getWindowSize(),
-              PROP_WINDOW_UNIT, anomalyFunctionDTO.getWindowUnit().toString())));
+      Map<String, Object> detectionProperties = new HashMap<>();
+      if (anomalyFunctionDTO.getWindowDelay() != 0) {
+        detectionProperties.put(PROP_WINDOW_DELAY, anomalyFunctionDTO.getWindowDelay());
+        detectionProperties.put(PROP_WINDOW_DELAY_UNIT, anomalyFunctionDTO.getWindowDelayUnit().toString());
+      }
+      detectionProperties.put("name", "detection_rule1");
+      detectionProperties.put("type", "MIGRATED_ALGORITHM");
+      detectionProperties.put("params", getAlgorithmDetectorParams(anomalyFunctionDTO));
+      detectionProperties.put(PROP_WINDOW_SIZE, anomalyFunctionDTO.getWindowSize());
+      detectionProperties.put(PROP_WINDOW_UNIT, anomalyFunctionDTO.getWindowUnit().toString());
+      detectionProperties.put("bucketPeriod", getBucketPeriod(anomalyFunctionDTO));
+
+      ruleYaml.put("detection", Collections.singletonList(detectionProperties));
     }
 
     // filters
@@ -288,10 +300,6 @@ public class DetectionMigrationResource {
     }
     params.put("variables.bucketPeriod", getBucketPeriod(functionDTO));
     params.put("variables.timeZone", getTimezone(functionDTO));
-    if (functionDTO.getWindowDelay() != 0) {
-      detectorYaml.put(PROP_WINDOW_DELAY, functionDTO.getWindowDelay());
-      detectorYaml.put(PROP_WINDOW_DELAY_UNIT, functionDTO.getWindowDelayUnit().toString());
-    }
     return detectorYaml;
   }
 
@@ -535,8 +543,7 @@ public class DetectionMigrationResource {
   }
 
   @POST
-  @Produces(MediaType.APPLICATION_JSON)
-  @Consumes(MediaType.APPLICATION_JSON)
+  @ApiOperation("migrate an application")
   @Path("/application/{name}")
   public Response migrateApplication(@PathParam("name") String application) {
     List<AlertConfigDTO> alertConfigDTOList = alertConfigDAO.findByPredicate(Predicate.EQ("application", application));
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index bf763b1..1ad6c8e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -20,15 +20,6 @@
 package org.apache.pinot.thirdeye.detection.algorithm;
 
 import com.google.common.base.Preconditions;
-import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.DetectionUtils;
-import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
-import org.apache.pinot.thirdeye.detection.ConfigUtils;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipeline;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -40,6 +31,15 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import org.apache.commons.collections.MapUtils;
+import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipeline;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
 
 
 /**
@@ -113,7 +113,6 @@ public class MergeWrapper extends DetectionPipeline {
       nestedConfig.setDescription(this.config.getDescription());
       nestedConfig.setProperties(properties);
       nestedConfig.setComponents(this.config.getComponents());
-
       DetectionPipeline pipeline = this.provider.loadPipeline(nestedConfig, this.startTime, this.endTime);
 
       DetectionPipelineResult intermediate = pipeline.run();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProvider.java
index e2d6fe5..40ca80a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProvider.java
@@ -49,19 +49,6 @@ public class RuleBaselineProvider implements BaselineProvider<RuleBaselineProvid
   }
 
   @Override
-  public Double computePredictedAggregates(MetricSlice slice, Series.DoubleFunction aggregateFunction) {
-    InputData data = this.dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(this.baseline.scatter(slice)));
-    double value;
-    try {
-      value = data.getAggregates().get(this.baseline.scatter(slice).get(0)).getDouble(COL_VALUE, 0);
-    } catch (Exception e) {
-      value = Double.NaN;
-    }
-    return value;
-  }
-
-
-  @Override
   public void init(RuleBaselineProviderSpec spec, InputDataFetcher dataFetcher) {
     this.offset = spec.getOffset();
     this.timezone = spec.getTimezone();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/components/BaselineProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/components/BaselineProvider.java
index 5446e32..b3555b8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/components/BaselineProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/components/BaselineProvider.java
@@ -41,7 +41,11 @@ public interface BaselineProvider<T extends AbstractSpec> extends BaseComponent<
    * @return the predicted value.
    */
   default Double computePredictedAggregates(MetricSlice slice, Series.DoubleFunction aggregateFunction){
-    TimeSeries baselineTimeSeries = this.computePredictedTimeSeries(slice);
-    return baselineTimeSeries.getPredictedBaseline().aggregate(aggregateFunction).getDouble(0);
+    try {
+      TimeSeries baselineTimeSeries = this.computePredictedTimeSeries(slice);
+      return baselineTimeSeries.getPredictedBaseline().aggregate(aggregateFunction).getDouble(0);
+    } catch (Exception e){
+      return Double.NaN;
+    }
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index d770fcc..299f231 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -20,6 +20,12 @@
 package org.apache.pinot.thirdeye.detection.wrapper;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.MapUtils;
 import org.apache.pinot.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
 import org.apache.pinot.thirdeye.common.time.TimeGranularity;
 import org.apache.pinot.thirdeye.common.time.TimeSpec;
@@ -37,12 +43,6 @@ import org.apache.pinot.thirdeye.detection.DetectionUtils;
 import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
 import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
 import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.collections.MapUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Interval;
@@ -71,6 +71,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
   private static final String PROP_DETECTOR = "detector";
   private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
   private static final String PROP_TIMEZONE = "timezone";
+  private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
+  private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(60);
 
   private static final Logger LOG = LoggerFactory.getLogger(
       AnomalyDetectorWrapper.class);
@@ -91,6 +93,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
   private final long windowSizeMillis;
   private final DatasetConfigDTO dataset;
   private final DateTimeZone dateTimeZone;
+  private final Period bucketPeriod;
 
 
   public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
@@ -123,18 +126,30 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     this.dataset = this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
         .get(metricConfigDTO.getDataset());
     // date time zone for moving windows. use dataset time zone as default
-    this.dateTimeZone = DateTimeZone.forID(MapUtils.getString(config.getProperties(), PROP_TIMEZONE, this.dataset.getTimezone()));
+    this.dateTimeZone = DateTimeZone.forID(MapUtils.getString(config.getProperties(), PROP_TIMEZONE, "America/Los_Angeles"));
+
+    String bucketStr = MapUtils.getString(config.getProperties(), PROP_BUCKET_PERIOD);
+    this.bucketPeriod = bucketStr == null ? this.getBucketSizePeriodForDataset() : Period.parse(bucketStr);
   }
 
   @Override
   public DetectionPipelineResult run() throws Exception {
+    // pre-cache time series with default granularity. this is used in multiple places:
+    // 1. get the last time stamp for the time series.
+    // 2. to calculate current values and  baseline values for the anomalies detected
+    // 3. anomaly detection current and baseline time series value
+    MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(), startTime - DEFAULT_CACHING_PERIOD_LOOKBACK, endTime, this.metricEntity.getFilters());
+    this.provider.cacheTimeseries(Collections.singleton(cacheSlice));
+
     List<Interval> monitoringWindows = this.getMonitoringWindows();
     List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
     for (Interval window : monitoringWindows) {
       List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
       try {
-        LOG.info("[New Pipeline] running detection for config {} metricUrn {}. start time {}, end time{}", config.getId(), metricUrn, window.getStart(), window.getEnd());
+        LOG.info("[New Pipeline] running detection for config {} metricUrn {}. start time {}, end time {}", config.getId(), metricUrn, window.getStart(), window.getEnd());
+        long ts = System.currentTimeMillis();
         anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
+        LOG.info("[New Pipeline] run anomaly detection for window {} - {} used {} milliseconds", window.getStart(), window.getEnd(), System.currentTimeMillis() - ts);
       } catch (Exception e) {
         LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
       }
@@ -190,8 +205,11 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
           monitoringWindows.add(new Interval(startTime, endTime, dateTimeZone));
         }
         for (Interval window : monitoringWindows){
-          LOG.info("running detections in windows {}", window);
+          LOG.info("Will run detection in window {}", window);
         }
+        // pre cache the time series for the whole detection time period instead of fetching for each window
+        MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(), startTime - DEFAULT_CACHING_PERIOD_LOOKBACK, endTime, this.metricEntity.getFilters(), toTimeGranularity(this.bucketPeriod));
+        this.provider.cacheTimeseries(Collections.singleton(cacheSlice));
         return monitoringWindows;
       } catch (Exception e) {
         LOG.info("can't generate moving monitoring windows, calling with single detection window", e);
@@ -208,9 +226,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     DateTime currentEndTime = new DateTime(getBoundaryAlignedTimeForDataset(new DateTime(endTime, dateTimeZone)), dateTimeZone);
 
     DateTime lastDateTime = new DateTime(getBoundaryAlignedTimeForDataset(new DateTime(startTime, dateTimeZone)), dateTimeZone);
-    Period bucketSizePeriod = getBucketSizePeriodForDataset();
     while (lastDateTime.isBefore(currentEndTime)) {
-      lastDateTime = lastDateTime.plus(bucketSizePeriod);
+      lastDateTime = lastDateTime.plus(this.bucketPeriod);
       endTimes.add(lastDateTime.getMillis());
     }
     return endTimes;
@@ -271,4 +288,17 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
     }
     return bucketSizePeriod;
   }
+
+  public static TimeGranularity toTimeGranularity(Period period) {
+    if (period.getDays() > 0) {
+      return new TimeGranularity(period.getDays(), TimeUnit.DAYS);
+    } else if (period.getHours() > 0) {
+      return new TimeGranularity(period.getHours(), TimeUnit.HOURS);
+    } else if (period.getMinutes() > 0)  {
+      return new TimeGranularity(period.getMinutes(), TimeUnit.MINUTES);
+    } else {
+      return new TimeGranularity(period.getMillis(), TimeUnit.MILLISECONDS);
+    }
+  }
+
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 00e0e5a..46095b3 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -161,6 +161,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
   private static final String PROP_NAME = "name";
   private static final String DEFAULT_TIMEZONE = "America/Los_Angeles";
   private static final String DEFAULT_BASELINE_PROVIDER_YAML_TYPE = "RULE_BASELINE";
+  private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
 
   private static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
   static {
@@ -176,7 +177,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
       ImmutableSet.of("MIGRATED_ALGORITHM_FILTER", "MIGRATED_ALGORITHM", "MIGRATED_ALGORITHM_BASELINE");
   private static final Map<String, String> DETECTOR_TO_BASELINE =
       ImmutableMap.of("ALGORITHM", "ALGORITHM_BASELINE", "MIGRATED_ALGORITHM", "MIGRATED_ALGORITHM_BASELINE");
-  private static final Set<String> MOVING_WINDOW_DETECTOR_TYPES = ImmutableSet.of("ALGORITHM");
+  private static final Set<String> MOVING_WINDOW_DETECTOR_TYPES = ImmutableSet.of("ALGORITHM", "MIGRATED_ALGORITHM");
 
   private final Map<String, Object> components = new HashMap<>();
   private MetricConfigDTO metricConfig;
@@ -321,6 +322,9 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
       if (yamlConfig.containsKey(PROP_TIMEZONE)){
         properties.put(PROP_TIMEZONE, MapUtils.getString(yamlConfig, PROP_TIMEZONE));
       }
+      if (yamlConfig.containsKey(PROP_BUCKET_PERIOD)){
+        properties.put(PROP_BUCKET_PERIOD, MapUtils.getString(yamlConfig, PROP_BUCKET_PERIOD));
+      }
     }
   }
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
index 5d508e9..377355b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
@@ -108,6 +108,5 @@ public abstract class YamlDetectionConfigTranslator {
    */
   protected void validateYAML(Map<String, Object> yamlConfig) {
     Preconditions.checkArgument(yamlConfig.containsKey(PROP_NAME), "Property missing " + PROP_NAME);
-    Preconditions.checkArgument(yamlConfig.containsKey(PROP_DESC_NAME), "Property missing " + PROP_DESC_NAME);
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
index 0a8aba0..4c5f277 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
@@ -527,7 +527,6 @@ public class YamlResource {
       responseMessage.put("message", "Failed to run the preview due to " + e.getMessage());
       return Response.serverError().entity(responseMessage).build();
     }
-
     LOG.info("Preview successful using payload " + payload);
     return Response.ok(result).build();
   }
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
index f3794f8..f1f8265 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
@@ -304,6 +304,21 @@ public class DataProviderTest {
     Assert.assertTrue(anomalies.contains(makeAnomaly(this.anomalyIds.get(3), 200L, 14400000L, 18000000L, Arrays.asList("a=1", "c=3"))));
   }
 
+  // cache
+
+  @Test
+  public void testTimeseriesCache(){
+    MetricSlice slice = MetricSlice.from(this.metricIds.get(0), 604800000L, 1814400000L);
+    this.provider.cacheTimeseries(Collections.singleton(slice));
+
+    DataFrame df = this.provider.fetchTimeseries(Collections.singleton(slice)).get(slice);
+
+    Assert.assertEquals(df.size(), 336);
+
+    double mean = df.getDoubles(COL_VALUE).mean().doubleValue();
+    Assert.assertTrue(Math.abs(mean - 1000) < EPSILON_MEAN);
+  }
+
   //
   // utils
   //
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
index fcea804..523b861 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
@@ -78,7 +78,7 @@ public class RuleBaselineProviderTest {
   @Test
   public void testFetchBaselineAggregates() {
     Assert.assertEquals(
-        this.baselineProvider.computePredictedAggregates(slice1, DoubleSeries.MEAN), 100.0);
+        this.baselineProvider.computePredictedAggregates(slice1, DoubleSeries.MEAN), 150.0);
   }
 
   @Test
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index ad036fc..87f7698 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -40,6 +40,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -104,13 +105,15 @@ public class BaselineFillingMergeWrapperTest {
 
     Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
     aggregates.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(-1, 100).build());
+    Map<MetricSlice, DataFrame> timeseries = new HashMap<>();
+    timeseries.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build());
 
     MetricConfigDTO metric = new MetricConfigDTO();
     metric.setId(1L);
     metric.setDefaultAggFunction(MetricAggFunction.SUM);
     DataProvider
         provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates)
-        .setMetrics(Collections.singletonList(metric));
+        .setMetrics(Collections.singletonList(metric)).setTimeseries(timeseries);
 
     this.config.getProperties().put(PROP_MAX_GAP, 100);
     this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
@@ -118,6 +121,7 @@ public class BaselineFillingMergeWrapperTest {
     BaselineProvider baselineProvider = new MockBaselineProvider();
     MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
     spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
+    spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), TimeSeries.fromDataFrame(DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build())));
     InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, this.config.getId());
     baselineProvider.init(spec, dataFetcher);
     this.config.setComponents(ImmutableMap.of("baseline", baselineProvider));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org