You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/01/30 20:27:15 UTC

[incubator-pinot] branch master updated: [TE] detection - endpoint to show predicted baseline (#3764)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a52a9c  [TE] detection - endpoint to show predicted baseline (#3764)
4a52a9c is described below

commit 4a52a9c48c1b17b07d28f923d5fd97addc975e14
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Wed Jan 30 12:27:10 2019 -0800

    [TE] detection - endpoint to show predicted baseline (#3764)
    
    The endpoint to show predicted baseline for anomalies.
---
 .../dashboard/resources/v2/AnomaliesResource.java  | 47 +++++++++-------------
 .../thirdeye/detection/DetectionResource.java      | 17 ++++++++
 .../pinot/thirdeye/detection/DetectionUtils.java   | 42 +++++++++++++++++++
 .../thirdeye/detection/spi/model/TimeSeries.java   |  7 +++-
 4 files changed, 83 insertions(+), 30 deletions(-)

diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/AnomaliesResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/AnomaliesResource.java
index 88b9dac..ba20a3b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/AnomaliesResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/AnomaliesResource.java
@@ -39,6 +39,7 @@ import org.apache.pinot.thirdeye.api.TimeSpec;
 import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
 import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
 import org.apache.pinot.thirdeye.dashboard.Utils;
+import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
 import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
 import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
 import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomaliesWrapper;
@@ -70,23 +71,22 @@ import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
 import org.apache.pinot.thirdeye.datasource.cache.QueryCache;
 import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
 import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DefaultDataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
 import org.apache.pinot.thirdeye.detector.email.filter.AlertFilterFactory;
 import org.apache.pinot.thirdeye.detector.function.AnomalyFunctionFactory;
 import org.apache.pinot.thirdeye.detector.function.BaseAnomalyFunction;
 import org.apache.pinot.thirdeye.detector.metric.transfer.MetricTransfer;
 import org.apache.pinot.thirdeye.detector.metric.transfer.ScalingFactor;
-import org.apache.pinot.thirdeye.rootcause.timeseries.Baseline;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregate;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregateType;
 import org.apache.pinot.thirdeye.util.AnomalyOffset;
 import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -153,12 +153,17 @@ public class AnomaliesResource {
   private final AnomalyFunctionManager anomalyFunctionDAO;
   private final DatasetConfigManager datasetConfigDAO;
   private final DetectionConfigManager detectionDAO;
+  private final EventManager eventDAO;
+  private final MergedAnomalyResultManager anomalyDAO;
+
   private final ExecutorService threadPool;
   private final AlertFilterFactory alertFilterFactory;
   private final AnomalyFunctionFactory anomalyFunctionFactory;
 
   private final TimeSeriesLoader timeSeriesLoader;
   private final AggregationLoader aggregationLoader;
+  private final DetectionPipelineLoader loader;
+  private final DataProvider provider;
 
   public AnomaliesResource(AnomalyFunctionFactory anomalyFunctionFactory, AlertFilterFactory alertFilterFactory) {
     this.metricConfigDAO = DAO_REGISTRY.getMetricConfigDAO();
@@ -170,12 +175,17 @@ public class AnomaliesResource {
     this.threadPool = Executors.newFixedThreadPool(NUM_EXECS);
     this.alertFilterFactory = alertFilterFactory;
     this.anomalyFunctionFactory = anomalyFunctionFactory;
+    this.eventDAO = DAORegistry.getInstance().getEventDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
 
     QueryCache queryCache = ThirdEyeCacheRegistry.getInstance().getQueryCache();
     LoadingCache<String, Long> maxTimeCache = ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache();
 
     this.timeSeriesLoader = new DefaultTimeSeriesLoader(this.metricConfigDAO, this.datasetConfigDAO, queryCache);
     this.aggregationLoader = new DefaultAggregationLoader(this.metricConfigDAO, this.datasetConfigDAO, queryCache, maxTimeCache);
+    this.loader = new DetectionPipelineLoader();
+
+    this.provider = new DefaultDataProvider(this.metricConfigDAO, this.datasetConfigDAO, this.eventDAO, this.anomalyDAO, this.timeSeriesLoader, this.aggregationLoader, this.loader);
   }
 
   @GET
@@ -899,16 +909,12 @@ public class AnomaliesResource {
     String dataset = datasetConfig.getDataset();
     String metricName = mergedAnomaly.getMetric();
 
-    if (mergedAnomaly.getFunctionId() == null) {
-      if (mergedAnomaly.getDetectionConfigId() == null) {
-        return null;
-      }
+    if (mergedAnomaly.getDetectionConfigId() != null) {
 
       DetectionConfigDTO detectionConfig = this.detectionDAO.findById(mergedAnomaly.getDetectionConfigId());
       if (detectionConfig == null) {
         return null;
       }
-
       return constructAnomalyDetails(mergedAnomaly, detectionConfig);
     }
 
@@ -1113,18 +1119,10 @@ public class AnomaliesResource {
 
     // TODO parallel load
     // TODO type from agg function
-    Baseline baseline = BaselineAggregate.fromWeekOverWeek(BaselineAggregateType.MEDIAN, 1, 1, dataTimeZone);
-
     MetricSlice sliceAnomalyCurrent = MetricSlice.from(metric.getId(), anomaly.getStartTime(), anomaly.getEndTime(), filters);
 
-    Collection<MetricSlice> slicesAnomalyBaseline = baseline.scatter(sliceAnomalyCurrent);
-    Map<MetricSlice, DataFrame> anomalyBaselineMap = new HashMap<>();
-    for (MetricSlice slice : slicesAnomalyBaseline) {
-      anomalyBaselineMap.put(slice, fixTimestamp(this.aggregationLoader.loadAggregate(slice, Collections.<String>emptyList(), -1), slice.getStart()));
-    }
-
-    details.setCurrent(makeStringValue(this.aggregationLoader.loadAggregate(sliceAnomalyCurrent, Collections.<String>emptyList(), -1)));
-    details.setBaseline(makeStringValue(baseline.gather(sliceAnomalyCurrent, anomalyBaselineMap)));
+    details.setCurrent(makeStringValue(new DataFrame().addSeries(COL_VALUE, anomaly.getAvgCurrentVal())));
+    details.setBaseline(makeStringValue(new DataFrame().addSeries(COL_VALUE, anomaly.getAvgBaselineVal())));
 
     AnomalyOffset offsets = BaseAnomalyFunction.getDefaultOffsets(dataset);
 
@@ -1132,15 +1130,8 @@ public class AnomaliesResource {
         .withStart(new DateTime(sliceAnomalyCurrent.getStart(), dataTimeZone).minus(offsets.getPreOffsetPeriod()).getMillis())
         .withEnd(new DateTime(sliceAnomalyCurrent.getEnd(), dataTimeZone).plus(offsets.getPostOffsetPeriod()).getMillis());
 
-    Collection<MetricSlice> slicesViewBaseline = baseline.scatter(sliceViewCurrent);
-    Map<MetricSlice, DataFrame> viewBaselineMap = new HashMap<>();
-    for (MetricSlice slice : slicesViewBaseline) {
-      viewBaselineMap.put(slice, this.timeSeriesLoader.load(slice));
-    }
-
     DataFrame dfCurrent = this.timeSeriesLoader.load(sliceViewCurrent);
-    DataFrame dfBaseline = baseline.gather(sliceViewCurrent, viewBaselineMap);
-
+    DataFrame dfBaseline = DetectionUtils.getBaselineTimeseries(anomaly, config, sliceViewCurrent.getStart(), sliceViewCurrent.getEnd(), this.loader, this.provider).getDataFrame();
     DataFrame dfAligned = dfCurrent.renameSeries(COL_VALUE, COL_CURRENT).joinOuter(
         dfBaseline.renameSeries(COL_VALUE, COL_BASELINE));
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
index bcd46cb..f151d8d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
@@ -67,6 +67,7 @@ import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
 import org.apache.pinot.thirdeye.detection.finetune.GridSearchTuningAlgorithm;
 import org.apache.pinot.thirdeye.detection.finetune.TuningAlgorithm;
 import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.quartz.CronExpression;
@@ -475,4 +476,20 @@ public class DetectionResource {
     return Response.ok(anomaly.getId()).build();
   }
 
+  @GET
+  @ApiOperation("get the predicted baseline for an anomaly within a time range")
+  @Path(value = "/predicted-baseline/{anomalyId}")
+  public Response getPredictedBaseline(
+    @PathParam("anomalyId") @ApiParam("anomalyId") long anomalyId,
+      @QueryParam("start") long start,
+      @QueryParam("end") long end
+  ) throws Exception {
+    MergedAnomalyResultDTO anomaly = anomalyDAO.findById(anomalyId);
+    if (anomaly == null) {
+      throw new IllegalArgumentException(String.format("Could not resolve anomaly id %d", anomalyId));
+    }
+    TimeSeries ts = DetectionUtils.getBaselineTimeseries(anomaly, configDAO.findById(anomaly.getId()), start, end, loader, provider);
+    return Response.ok(ts.getDataFrame()).build();
+  }
+
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
index ef0b7f6..2ba785c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
@@ -25,13 +25,19 @@ import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
 import org.apache.pinot.thirdeye.dataframe.DataFrame;
 import org.apache.pinot.thirdeye.dataframe.LongSeries;
 import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
 import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.pojo.MetricConfigBean;
 import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.components.RuleBaselineProvider;
+import org.apache.pinot.thirdeye.detection.spec.RuleBaselineProviderSpec;
 import org.apache.pinot.thirdeye.detection.spi.components.BaseComponent;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
 import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
 import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
 import org.apache.pinot.thirdeye.detection.spi.model.InputData;
@@ -45,6 +51,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
@@ -53,6 +61,8 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
 
 
 public class DetectionUtils {
+  private static final String PROP_BASELINE_PROVIDER_COMPONENT_NAME = "baselineProviderComponentName";
+
   // TODO anomaly should support multimap
   public static DimensionMap toFilterMap(Multimap<String, String> filters) {
     DimensionMap map = new DimensionMap();
@@ -175,4 +185,36 @@ public class DetectionUtils {
     }
     return Collections.min(nestedLastTimeStamps);
   }
+
+  /**
+   * Get the predicted baseline for a anomaly in a time range. Will return wo1w if baseline provider not available.
+   * @param anomaly the anomaly
+   * @param config the detection config
+   * @param start start time
+   * @param end end time
+   * @param loader detection pipeline loader
+   * @param provider data provider
+   * @return baseline time series
+   * @throws Exception
+   */
+  public static TimeSeries getBaselineTimeseries(MergedAnomalyResultDTO anomaly, DetectionConfigDTO config,
+      long start, long end, DetectionPipelineLoader loader, DataProvider provider) throws Exception {
+    String baselineProviderComponentName = anomaly.getProperties().get(PROP_BASELINE_PROVIDER_COMPONENT_NAME);
+    BaselineProvider baselineProvider = new RuleBaselineProvider();
+    MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
+
+    if (baselineProviderComponentName != null && config != null &&
+        config.getComponentSpecs().containsKey(baselineProviderComponentName)) {
+      // load pipeline and init components
+      loader.from(provider, config, start, end);
+      baselineProvider = (BaselineProvider) config.getComponents().get(baselineProviderComponentName);
+    } else {
+      // use wow instead
+      RuleBaselineProviderSpec spec = new RuleBaselineProviderSpec();
+      spec.setOffset("wo1w");
+      InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, config.getId());
+      baselineProvider.init(spec, dataFetcher);
+    }
+    return baselineProvider.computePredictedTimeSeries(MetricSlice.from(me.getId(), start, end, me.getFilters()));
+  }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java
index aa17f4c..a32c693 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java
@@ -24,6 +24,9 @@ import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
 import org.apache.pinot.thirdeye.dataframe.LongSeries;
 import org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils;
 
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
 /**
  * Time series. wrapper object of data frame. Used by baselineProvider to return the predicted time series
  */
@@ -39,7 +42,7 @@ public class TimeSeries {
    * @param timestamps
    */
   public void addTimeStamps(LongSeries timestamps) {
-    this.df.addSeries(DataFrameUtils.COL_TIME, timestamps);
+    this.df.addSeries(COL_TIME, timestamps).setIndex(COL_TIME);
   }
 
   /**
@@ -52,7 +55,7 @@ public class TimeSeries {
 
   public static TimeSeries fromDataFrame(DataFrame df) {
     TimeSeries ts = new TimeSeries();
-    ts.df.addSeries(DataFrameUtils.COL_TIME, df.get(DataFrameUtils.COL_TIME));
+    ts.df.addSeries(COL_TIME, df.get(COL_TIME)).setIndex(COL_TIME);
     ts.df.addSeries(DataFrameUtils.COL_VALUE, df.get(DataFrameUtils.COL_VALUE));
     return ts;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org