You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/01/30 20:27:15 UTC
[incubator-pinot] branch master updated: [TE] detection - endpoint
to show predicted baseline (#3764)
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4a52a9c [TE] detection - endpoint to show predicted baseline (#3764)
4a52a9c is described below
commit 4a52a9c48c1b17b07d28f923d5fd97addc975e14
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Wed Jan 30 12:27:10 2019 -0800
[TE] detection - endpoint to show predicted baseline (#3764)
The endpoint to show predicted baseline for anomalies.
---
.../dashboard/resources/v2/AnomaliesResource.java | 47 +++++++++-------------
.../thirdeye/detection/DetectionResource.java | 17 ++++++++
.../pinot/thirdeye/detection/DetectionUtils.java | 42 +++++++++++++++++++
.../thirdeye/detection/spi/model/TimeSeries.java | 7 +++-
4 files changed, 83 insertions(+), 30 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/AnomaliesResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/AnomaliesResource.java
index 88b9dac..ba20a3b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/AnomaliesResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dashboard/resources/v2/AnomaliesResource.java
@@ -39,6 +39,7 @@ import org.apache.pinot.thirdeye.api.TimeSpec;
import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
import org.apache.pinot.thirdeye.constant.AnomalyResultSource;
import org.apache.pinot.thirdeye.dashboard.Utils;
+import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
import org.apache.pinot.thirdeye.dashboard.resources.v2.pojo.AnomaliesWrapper;
@@ -70,23 +71,22 @@ import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
import org.apache.pinot.thirdeye.datasource.cache.QueryCache;
import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DefaultDataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
import org.apache.pinot.thirdeye.detector.email.filter.AlertFilterFactory;
import org.apache.pinot.thirdeye.detector.function.AnomalyFunctionFactory;
import org.apache.pinot.thirdeye.detector.function.BaseAnomalyFunction;
import org.apache.pinot.thirdeye.detector.metric.transfer.MetricTransfer;
import org.apache.pinot.thirdeye.detector.metric.transfer.ScalingFactor;
-import org.apache.pinot.thirdeye.rootcause.timeseries.Baseline;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregate;
-import org.apache.pinot.thirdeye.rootcause.timeseries.BaselineAggregateType;
import org.apache.pinot.thirdeye.util.AnomalyOffset;
import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -153,12 +153,17 @@ public class AnomaliesResource {
private final AnomalyFunctionManager anomalyFunctionDAO;
private final DatasetConfigManager datasetConfigDAO;
private final DetectionConfigManager detectionDAO;
+ private final EventManager eventDAO;
+ private final MergedAnomalyResultManager anomalyDAO;
+
private final ExecutorService threadPool;
private final AlertFilterFactory alertFilterFactory;
private final AnomalyFunctionFactory anomalyFunctionFactory;
private final TimeSeriesLoader timeSeriesLoader;
private final AggregationLoader aggregationLoader;
+ private final DetectionPipelineLoader loader;
+ private final DataProvider provider;
public AnomaliesResource(AnomalyFunctionFactory anomalyFunctionFactory, AlertFilterFactory alertFilterFactory) {
this.metricConfigDAO = DAO_REGISTRY.getMetricConfigDAO();
@@ -170,12 +175,17 @@ public class AnomaliesResource {
this.threadPool = Executors.newFixedThreadPool(NUM_EXECS);
this.alertFilterFactory = alertFilterFactory;
this.anomalyFunctionFactory = anomalyFunctionFactory;
+ this.eventDAO = DAORegistry.getInstance().getEventDAO();
+ this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
QueryCache queryCache = ThirdEyeCacheRegistry.getInstance().getQueryCache();
LoadingCache<String, Long> maxTimeCache = ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache();
this.timeSeriesLoader = new DefaultTimeSeriesLoader(this.metricConfigDAO, this.datasetConfigDAO, queryCache);
this.aggregationLoader = new DefaultAggregationLoader(this.metricConfigDAO, this.datasetConfigDAO, queryCache, maxTimeCache);
+ this.loader = new DetectionPipelineLoader();
+
+ this.provider = new DefaultDataProvider(this.metricConfigDAO, this.datasetConfigDAO, this.eventDAO, this.anomalyDAO, this.timeSeriesLoader, this.aggregationLoader, this.loader);
}
@GET
@@ -899,16 +909,12 @@ public class AnomaliesResource {
String dataset = datasetConfig.getDataset();
String metricName = mergedAnomaly.getMetric();
- if (mergedAnomaly.getFunctionId() == null) {
- if (mergedAnomaly.getDetectionConfigId() == null) {
- return null;
- }
+ if (mergedAnomaly.getDetectionConfigId() != null) {
DetectionConfigDTO detectionConfig = this.detectionDAO.findById(mergedAnomaly.getDetectionConfigId());
if (detectionConfig == null) {
return null;
}
-
return constructAnomalyDetails(mergedAnomaly, detectionConfig);
}
@@ -1113,18 +1119,10 @@ public class AnomaliesResource {
// TODO parallel load
// TODO type from agg function
- Baseline baseline = BaselineAggregate.fromWeekOverWeek(BaselineAggregateType.MEDIAN, 1, 1, dataTimeZone);
-
MetricSlice sliceAnomalyCurrent = MetricSlice.from(metric.getId(), anomaly.getStartTime(), anomaly.getEndTime(), filters);
- Collection<MetricSlice> slicesAnomalyBaseline = baseline.scatter(sliceAnomalyCurrent);
- Map<MetricSlice, DataFrame> anomalyBaselineMap = new HashMap<>();
- for (MetricSlice slice : slicesAnomalyBaseline) {
- anomalyBaselineMap.put(slice, fixTimestamp(this.aggregationLoader.loadAggregate(slice, Collections.<String>emptyList(), -1), slice.getStart()));
- }
-
- details.setCurrent(makeStringValue(this.aggregationLoader.loadAggregate(sliceAnomalyCurrent, Collections.<String>emptyList(), -1)));
- details.setBaseline(makeStringValue(baseline.gather(sliceAnomalyCurrent, anomalyBaselineMap)));
+ details.setCurrent(makeStringValue(new DataFrame().addSeries(COL_VALUE, anomaly.getAvgCurrentVal())));
+ details.setBaseline(makeStringValue(new DataFrame().addSeries(COL_VALUE, anomaly.getAvgBaselineVal())));
AnomalyOffset offsets = BaseAnomalyFunction.getDefaultOffsets(dataset);
@@ -1132,15 +1130,8 @@ public class AnomaliesResource {
.withStart(new DateTime(sliceAnomalyCurrent.getStart(), dataTimeZone).minus(offsets.getPreOffsetPeriod()).getMillis())
.withEnd(new DateTime(sliceAnomalyCurrent.getEnd(), dataTimeZone).plus(offsets.getPostOffsetPeriod()).getMillis());
- Collection<MetricSlice> slicesViewBaseline = baseline.scatter(sliceViewCurrent);
- Map<MetricSlice, DataFrame> viewBaselineMap = new HashMap<>();
- for (MetricSlice slice : slicesViewBaseline) {
- viewBaselineMap.put(slice, this.timeSeriesLoader.load(slice));
- }
-
DataFrame dfCurrent = this.timeSeriesLoader.load(sliceViewCurrent);
- DataFrame dfBaseline = baseline.gather(sliceViewCurrent, viewBaselineMap);
-
+ DataFrame dfBaseline = DetectionUtils.getBaselineTimeseries(anomaly, config, sliceViewCurrent.getStart(), sliceViewCurrent.getEnd(), this.loader, this.provider).getDataFrame();
DataFrame dfAligned = dfCurrent.renameSeries(COL_VALUE, COL_CURRENT).joinOuter(
dfBaseline.renameSeries(COL_VALUE, COL_BASELINE));
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
index bcd46cb..f151d8d 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java
@@ -67,6 +67,7 @@ import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
import org.apache.pinot.thirdeye.detection.finetune.GridSearchTuningAlgorithm;
import org.apache.pinot.thirdeye.detection.finetune.TuningAlgorithm;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.quartz.CronExpression;
@@ -475,4 +476,20 @@ public class DetectionResource {
return Response.ok(anomaly.getId()).build();
}
+ @GET
+ @ApiOperation("get the predicted baseline for an anomaly within a time range")
+ @Path(value = "/predicted-baseline/{anomalyId}")
+ public Response getPredictedBaseline(
+ @PathParam("anomalyId") @ApiParam("anomalyId") long anomalyId,
+ @QueryParam("start") long start,
+ @QueryParam("end") long end
+ ) throws Exception {
+ MergedAnomalyResultDTO anomaly = anomalyDAO.findById(anomalyId);
+ if (anomaly == null) {
+ throw new IllegalArgumentException(String.format("Could not resolve anomaly id %d", anomalyId));
+ }
+ TimeSeries ts = DetectionUtils.getBaselineTimeseries(anomaly, configDAO.findById(anomaly.getId()), start, end, loader, provider);
+ return Response.ok(ts.getDataFrame()).build();
+ }
+
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
index ef0b7f6..2ba785c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionUtils.java
@@ -25,13 +25,19 @@ import org.apache.pinot.thirdeye.dataframe.BooleanSeries;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.LongSeries;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.datalayer.pojo.MetricConfigBean;
import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.components.RuleBaselineProvider;
+import org.apache.pinot.thirdeye.detection.spec.RuleBaselineProviderSpec;
import org.apache.pinot.thirdeye.detection.spi.components.BaseComponent;
+import org.apache.pinot.thirdeye.detection.spi.components.BaselineProvider;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
import org.apache.pinot.thirdeye.detection.spi.model.InputData;
@@ -45,6 +51,8 @@ import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
+import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
@@ -53,6 +61,8 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
public class DetectionUtils {
+ private static final String PROP_BASELINE_PROVIDER_COMPONENT_NAME = "baselineProviderComponentName";
+
// TODO anomaly should support multimap
public static DimensionMap toFilterMap(Multimap<String, String> filters) {
DimensionMap map = new DimensionMap();
@@ -175,4 +185,36 @@ public class DetectionUtils {
}
return Collections.min(nestedLastTimeStamps);
}
+
+ /**
+ * Get the predicted baseline for a anomaly in a time range. Will return wo1w if baseline provider not available.
+ * @param anomaly the anomaly
+ * @param config the detection config
+ * @param start start time
+ * @param end end time
+ * @param loader detection pipeline loader
+ * @param provider data provider
+ * @return baseline time series
+ * @throws Exception
+ */
+ public static TimeSeries getBaselineTimeseries(MergedAnomalyResultDTO anomaly, DetectionConfigDTO config,
+ long start, long end, DetectionPipelineLoader loader, DataProvider provider) throws Exception {
+ String baselineProviderComponentName = anomaly.getProperties().get(PROP_BASELINE_PROVIDER_COMPONENT_NAME);
+ BaselineProvider baselineProvider = new RuleBaselineProvider();
+ MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
+
+ if (baselineProviderComponentName != null && config != null &&
+ config.getComponentSpecs().containsKey(baselineProviderComponentName)) {
+ // load pipeline and init components
+ loader.from(provider, config, start, end);
+ baselineProvider = (BaselineProvider) config.getComponents().get(baselineProviderComponentName);
+ } else {
+ // use wow instead
+ RuleBaselineProviderSpec spec = new RuleBaselineProviderSpec();
+ spec.setOffset("wo1w");
+ InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, config.getId());
+ baselineProvider.init(spec, dataFetcher);
+ }
+ return baselineProvider.computePredictedTimeSeries(MetricSlice.from(me.getId(), start, end, me.getFilters()));
+ }
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java
index aa17f4c..a32c693 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/model/TimeSeries.java
@@ -24,6 +24,9 @@ import org.apache.pinot.thirdeye.dataframe.DoubleSeries;
import org.apache.pinot.thirdeye.dataframe.LongSeries;
import org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils;
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
+
/**
* Time series. wrapper object of data frame. Used by baselineProvider to return the predicted time series
*/
@@ -39,7 +42,7 @@ public class TimeSeries {
* @param timestamps
*/
public void addTimeStamps(LongSeries timestamps) {
- this.df.addSeries(DataFrameUtils.COL_TIME, timestamps);
+ this.df.addSeries(COL_TIME, timestamps).setIndex(COL_TIME);
}
/**
@@ -52,7 +55,7 @@ public class TimeSeries {
public static TimeSeries fromDataFrame(DataFrame df) {
TimeSeries ts = new TimeSeries();
- ts.df.addSeries(DataFrameUtils.COL_TIME, df.get(DataFrameUtils.COL_TIME));
+ ts.df.addSeries(COL_TIME, df.get(COL_TIME)).setIndex(COL_TIME);
ts.df.addSeries(DataFrameUtils.COL_VALUE, df.get(DataFrameUtils.COL_VALUE));
return ts;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org