You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/02/12 01:37:11 UTC
[incubator-pinot] branch master updated: [TE] detection - caching &
configure time granularity (#3810)
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 017e9e5 [TE] detection - caching & configure time granularity (#3810)
017e9e5 is described below
commit 017e9e555e2db0ec0334d7c3977daef470504298
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Mon Feb 11 17:37:04 2019 -0800
[TE] detection - caching & configure time granularity (#3810)
* Add the ability to configure time granularity in YAML
* Reduce the number of time querying pinot in detection by catching time series
---
.../pinot/thirdeye/dataframe/util/MetricSlice.java | 10 ++-
.../pinot/thirdeye/detection/DataProvider.java | 14 +++-
.../thirdeye/detection/DefaultDataProvider.java | 83 ++++++++++++++++------
.../detection/DetectionMigrationResource.java | 31 ++++----
.../thirdeye/detection/algorithm/MergeWrapper.java | 19 +++--
.../detection/components/RuleBaselineProvider.java | 13 ----
.../detection/spi/components/BaselineProvider.java | 8 ++-
.../detection/wrapper/AnomalyDetectorWrapper.java | 52 +++++++++++---
.../yaml/CompositePipelineConfigTranslator.java | 6 +-
.../yaml/YamlDetectionConfigTranslator.java | 1 -
.../thirdeye/detection/yaml/YamlResource.java | 1 -
.../pinot/thirdeye/detection/DataProviderTest.java | 15 ++++
.../components/RuleBaselineProviderTest.java | 2 +-
.../wrapper/BaselineFillingMergeWrapperTest.java | 6 +-
14 files changed, 183 insertions(+), 78 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/MetricSlice.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/MetricSlice.java
index 5a04070..7039ff8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/MetricSlice.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/dataframe/util/MetricSlice.java
@@ -21,11 +21,11 @@ package org.apache.pinot.thirdeye.dataframe.util;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
-import org.apache.pinot.thirdeye.common.time.TimeGranularity;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
/**
@@ -97,6 +97,14 @@ public final class MetricSlice {
return new MetricSlice(metricId, start, end, filters, granularity);
}
+ /**
+ * check if current metric slice contains another metric slice
+ */
+ public boolean containSlice(MetricSlice slice) {
+ return slice.metricId == this.metricId && slice.granularity.equals(this.granularity) && slice.getFilters().equals(this.getFilters()) &&
+ slice.start >= this.start && slice.end <= this.end;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
index 4c714ba..9175cb9 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
@@ -20,6 +20,9 @@
package org.apache.pinot.thirdeye.detection;
import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
@@ -29,9 +32,6 @@ import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
/**
@@ -55,6 +55,14 @@ public interface DataProvider {
Map<MetricSlice, DataFrame> fetchTimeseries(Collection<MetricSlice> slices);
/**
+ * Caches the time series for the metric slices for later use
+ * @param slices
+ */
+ default void cacheTimeseries(Collection<MetricSlice> slices){
+ // do nothing if not implemented
+ }
+
+ /**
* Returns a map of aggregation values (keyed by slice) for a given set of slices,
* grouped by the given dimensions.
* The format of the DataFrame follows the standard convention of DataFrameUtils.
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
index f43febe..0b22629 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
@@ -19,9 +19,24 @@
package org.apache.pinot.thirdeye.detection;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
@@ -36,23 +51,13 @@ import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.datalayer.util.Predicate;
import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
-import org.apache.pinot.thirdeye.detection.alert.StatefulDetectionAlertFilter;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
+
public class DefaultDataProvider implements DataProvider {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataProvider.class);
@@ -67,6 +72,7 @@ public class DefaultDataProvider implements DataProvider {
private final TimeSeriesLoader timeseriesLoader;
private final AggregationLoader aggregationLoader;
private final DetectionPipelineLoader loader;
+ private final LoadingCache<MetricSlice, DataFrame> timeseriesCache;
public DefaultDataProvider(MetricConfigManager metricDAO, DatasetConfigManager datasetDAO, EventManager eventDAO,
MergedAnomalyResultManager anomalyDAO, TimeSeriesLoader timeseriesLoader, AggregationLoader aggregationLoader,
@@ -78,26 +84,50 @@ public class DefaultDataProvider implements DataProvider {
this.timeseriesLoader = timeseriesLoader;
this.aggregationLoader = aggregationLoader;
this.loader = loader;
+ this.timeseriesCache = CacheBuilder.newBuilder()
+ .maximumSize(100)
+ .expireAfterWrite(30, TimeUnit.MINUTES)
+ .build(new CacheLoader<MetricSlice, DataFrame>() {
+ @Override
+ public DataFrame load(MetricSlice slice) {
+ return fetchTimeseries(Collections.singleton(slice)).get(slice);
+ }
+ });
}
+
@Override
public Map<MetricSlice, DataFrame> fetchTimeseries(Collection<MetricSlice> slices) {
try {
- Map<MetricSlice, Future<DataFrame>> futures = new HashMap<>();
- for (final MetricSlice slice : slices) {
- futures.put(slice, this.executor.submit(new Callable<DataFrame>() {
- @Override
- public DataFrame call() throws Exception {
- return DefaultDataProvider.this.timeseriesLoader.load(slice);
+ long ts = System.currentTimeMillis();
+ Map<MetricSlice, DataFrame> output = new HashMap<>();
+
+ // if already in cache, return directly
+ for (MetricSlice slice : slices){
+ for (Map.Entry<MetricSlice, DataFrame> entry : this.timeseriesCache.asMap().entrySet()) {
+ if (entry.getKey().containSlice(slice)){
+ DataFrame df = entry.getValue().filter(entry.getValue().getLongs(COL_TIME).between(slice.getStart(), slice.getEnd())).dropNull(COL_TIME);
+ output.put(slice, df);
+ break;
}
- }));
+ }
}
+ // if not in cache, fetch from data source
+ Map<MetricSlice, Future<DataFrame>> futures = new HashMap<>();
+ for (final MetricSlice slice : slices) {
+ if (!output.containsKey(slice)){
+ futures.put(slice, this.executor.submit(() -> DefaultDataProvider.this.timeseriesLoader.load(slice)));
+ }
+ }
+ LOG.info("Fetching {} slices of timeseries, {} cache hit, {} cache miss", slices.size(), output.size(), futures.size());
final long deadline = System.currentTimeMillis() + TIMEOUT;
- Map<MetricSlice, DataFrame> output = new HashMap<>();
for (MetricSlice slice : slices) {
- output.put(slice, futures.get(slice).get(makeTimeout(deadline), TimeUnit.MILLISECONDS));
+ if (!output.containsKey(slice)) {
+ output.put(slice, futures.get(slice).get(makeTimeout(deadline), TimeUnit.MILLISECONDS));
+ }
}
+ LOG.info("Fetching {} slices used {} milliseconds", slices.size(), System.currentTimeMillis() - ts);
return output;
} catch (Exception e) {
@@ -106,6 +136,17 @@ public class DefaultDataProvider implements DataProvider {
}
@Override
+ public void cacheTimeseries(Collection<MetricSlice> slices) {
+ for (MetricSlice slice : slices){
+ try {
+ this.timeseriesCache.get(slice);
+ } catch (Exception e){
+ LOG.warn("cache time series for slice {} failed", slice.toString());
+ }
+ }
+ }
+
+ @Override
public Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, final List<String> dimensions) {
try {
Map<MetricSlice, Future<DataFrame>> futures = new HashMap<>();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
index 1e041cc..ebed49c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionMigrationResource.java
@@ -21,7 +21,8 @@ package org.apache.pinot.thirdeye.detection;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
+import com.wordnik.swagger.annotations.Api;
+import com.wordnik.swagger.annotations.ApiOperation;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -41,6 +42,7 @@ import javax.ws.rs.core.Response;
import javax.xml.bind.ValidationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.thirdeye.anomaly.detection.AnomalyDetectionInputContextBuilder;
+import org.apache.pinot.thirdeye.api.Constants;
import org.apache.pinot.thirdeye.datalayer.bao.AlertConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.AnomalyFunctionManager;
import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
@@ -56,10 +58,10 @@ import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.datalayer.util.Predicate;
-import org.apache.pinot.thirdeye.datasource.DAORegistry;
import org.apache.pinot.thirdeye.detection.yaml.YamlDetectionAlertConfigTranslator;
import org.apache.pinot.thirdeye.detection.yaml.YamlResource;
import org.joda.time.Period;
+import org.omg.CORBA.OBJ_ADAPTER;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.DumperOptions;
@@ -73,6 +75,7 @@ import static org.apache.pinot.thirdeye.detection.yaml.YamlDetectionAlertConfigT
* The Detection migration resource.
*/
@Path("/migrate")
+@Api(tags = {Constants.YAML_TAG})
public class DetectionMigrationResource {
private static final Logger LOGGER = LoggerFactory.getLogger(DetectionMigrationResource.class);
private static final String PROP_WINDOW_DELAY = "windowDelay";
@@ -146,10 +149,19 @@ public class DetectionMigrationResource {
"params", getMinMaxThresholdRuleDetectorParams(anomalyFunctionDTO))));
} else{
// algorithm detector
- ruleYaml.put("detection", Collections.singletonList(
- ImmutableMap.of("name", "detection_rule1", "type", "MIGRATED_ALGORITHM", "params", getAlgorithmDetectorParams(anomalyFunctionDTO),
- PROP_WINDOW_SIZE, anomalyFunctionDTO.getWindowSize(),
- PROP_WINDOW_UNIT, anomalyFunctionDTO.getWindowUnit().toString())));
+ Map<String, Object> detectionProperties = new HashMap<>();
+ if (anomalyFunctionDTO.getWindowDelay() != 0) {
+ detectionProperties.put(PROP_WINDOW_DELAY, anomalyFunctionDTO.getWindowDelay());
+ detectionProperties.put(PROP_WINDOW_DELAY_UNIT, anomalyFunctionDTO.getWindowDelayUnit().toString());
+ }
+ detectionProperties.put("name", "detection_rule1");
+ detectionProperties.put("type", "MIGRATED_ALGORITHM");
+ detectionProperties.put("params", getAlgorithmDetectorParams(anomalyFunctionDTO));
+ detectionProperties.put(PROP_WINDOW_SIZE, anomalyFunctionDTO.getWindowSize());
+ detectionProperties.put(PROP_WINDOW_UNIT, anomalyFunctionDTO.getWindowUnit().toString());
+ detectionProperties.put("bucketPeriod", getBucketPeriod(anomalyFunctionDTO));
+
+ ruleYaml.put("detection", Collections.singletonList(detectionProperties));
}
// filters
@@ -288,10 +300,6 @@ public class DetectionMigrationResource {
}
params.put("variables.bucketPeriod", getBucketPeriod(functionDTO));
params.put("variables.timeZone", getTimezone(functionDTO));
- if (functionDTO.getWindowDelay() != 0) {
- detectorYaml.put(PROP_WINDOW_DELAY, functionDTO.getWindowDelay());
- detectorYaml.put(PROP_WINDOW_DELAY_UNIT, functionDTO.getWindowDelayUnit().toString());
- }
return detectorYaml;
}
@@ -535,8 +543,7 @@ public class DetectionMigrationResource {
}
@POST
- @Produces(MediaType.APPLICATION_JSON)
- @Consumes(MediaType.APPLICATION_JSON)
+ @ApiOperation("migrate an application")
@Path("/application/{name}")
public Response migrateApplication(@PathParam("name") String application) {
List<AlertConfigDTO> alertConfigDTOList = alertConfigDAO.findByPredicate(Predicate.EQ("application", application));
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
index bf763b1..1ad6c8e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/MergeWrapper.java
@@ -20,15 +20,6 @@
package org.apache.pinot.thirdeye.detection.algorithm;
import com.google.common.base.Preconditions;
-import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.detection.DetectionUtils;
-import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
-import org.apache.pinot.thirdeye.detection.ConfigUtils;
-import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipeline;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -40,6 +31,15 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.commons.collections.MapUtils;
+import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.detection.ConfigUtils;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipeline;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
+import org.apache.pinot.thirdeye.detection.DetectionUtils;
+import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
/**
@@ -113,7 +113,6 @@ public class MergeWrapper extends DetectionPipeline {
nestedConfig.setDescription(this.config.getDescription());
nestedConfig.setProperties(properties);
nestedConfig.setComponents(this.config.getComponents());
-
DetectionPipeline pipeline = this.provider.loadPipeline(nestedConfig, this.startTime, this.endTime);
DetectionPipelineResult intermediate = pipeline.run();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProvider.java
index e2d6fe5..40ca80a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProvider.java
@@ -49,19 +49,6 @@ public class RuleBaselineProvider implements BaselineProvider<RuleBaselineProvid
}
@Override
- public Double computePredictedAggregates(MetricSlice slice, Series.DoubleFunction aggregateFunction) {
- InputData data = this.dataFetcher.fetchData(new InputDataSpec().withAggregateSlices(this.baseline.scatter(slice)));
- double value;
- try {
- value = data.getAggregates().get(this.baseline.scatter(slice).get(0)).getDouble(COL_VALUE, 0);
- } catch (Exception e) {
- value = Double.NaN;
- }
- return value;
- }
-
-
- @Override
public void init(RuleBaselineProviderSpec spec, InputDataFetcher dataFetcher) {
this.offset = spec.getOffset();
this.timezone = spec.getTimezone();
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/components/BaselineProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/components/BaselineProvider.java
index 5446e32..b3555b8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/components/BaselineProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/spi/components/BaselineProvider.java
@@ -41,7 +41,11 @@ public interface BaselineProvider<T extends AbstractSpec> extends BaseComponent<
* @return the predicted value.
*/
default Double computePredictedAggregates(MetricSlice slice, Series.DoubleFunction aggregateFunction){
- TimeSeries baselineTimeSeries = this.computePredictedTimeSeries(slice);
- return baselineTimeSeries.getPredictedBaseline().aggregate(aggregateFunction).getDouble(0);
+ try {
+ TimeSeries baselineTimeSeries = this.computePredictedTimeSeries(slice);
+ return baselineTimeSeries.getPredictedBaseline().aggregate(aggregateFunction).getDouble(0);
+ } catch (Exception e){
+ return Double.NaN;
+ }
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index d770fcc..299f231 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -20,6 +20,12 @@
package org.apache.pinot.thirdeye.detection.wrapper;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.collections.MapUtils;
import org.apache.pinot.thirdeye.anomaly.detection.DetectionJobSchedulerUtils;
import org.apache.pinot.thirdeye.common.time.TimeGranularity;
import org.apache.pinot.thirdeye.common.time.TimeSpec;
@@ -37,12 +43,6 @@ import org.apache.pinot.thirdeye.detection.DetectionUtils;
import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
import org.apache.pinot.thirdeye.rootcause.impl.MetricEntity;
import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.collections.MapUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
@@ -71,6 +71,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
private static final String PROP_DETECTOR = "detector";
private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
private static final String PROP_TIMEZONE = "timezone";
+ private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
+ private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(60);
private static final Logger LOG = LoggerFactory.getLogger(
AnomalyDetectorWrapper.class);
@@ -91,6 +93,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
private final long windowSizeMillis;
private final DatasetConfigDTO dataset;
private final DateTimeZone dateTimeZone;
+ private final Period bucketPeriod;
public AnomalyDetectorWrapper(DataProvider provider, DetectionConfigDTO config, long startTime, long endTime) {
@@ -123,18 +126,30 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
this.dataset = this.provider.fetchDatasets(Collections.singletonList(metricConfigDTO.getDataset()))
.get(metricConfigDTO.getDataset());
// date time zone for moving windows. use dataset time zone as default
- this.dateTimeZone = DateTimeZone.forID(MapUtils.getString(config.getProperties(), PROP_TIMEZONE, this.dataset.getTimezone()));
+ this.dateTimeZone = DateTimeZone.forID(MapUtils.getString(config.getProperties(), PROP_TIMEZONE, "America/Los_Angeles"));
+
+ String bucketStr = MapUtils.getString(config.getProperties(), PROP_BUCKET_PERIOD);
+ this.bucketPeriod = bucketStr == null ? this.getBucketSizePeriodForDataset() : Period.parse(bucketStr);
}
@Override
public DetectionPipelineResult run() throws Exception {
+ // pre-cache time series with default granularity. this is used in multiple places:
+ // 1. get the last time stamp for the time series.
+ // 2. to calculate current values and baseline values for the anomalies detected
+ // 3. anomaly detection current and baseline time series value
+ MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(), startTime - DEFAULT_CACHING_PERIOD_LOOKBACK, endTime, this.metricEntity.getFilters());
+ this.provider.cacheTimeseries(Collections.singleton(cacheSlice));
+
List<Interval> monitoringWindows = this.getMonitoringWindows();
List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
for (Interval window : monitoringWindows) {
List<MergedAnomalyResultDTO> anomaliesForOneWindow = new ArrayList<>();
try {
- LOG.info("[New Pipeline] running detection for config {} metricUrn {}. start time {}, end time{}", config.getId(), metricUrn, window.getStart(), window.getEnd());
+ LOG.info("[New Pipeline] running detection for config {} metricUrn {}. start time {}, end time {}", config.getId(), metricUrn, window.getStart(), window.getEnd());
+ long ts = System.currentTimeMillis();
anomaliesForOneWindow = anomalyDetector.runDetection(window, this.metricUrn);
+ LOG.info("[New Pipeline] run anomaly detection for window {} - {} used {} milliseconds", window.getStart(), window.getEnd(), System.currentTimeMillis() - ts);
} catch (Exception e) {
LOG.warn("[DetectionConfigID{}] detecting anomalies for window {} to {} failed.", this.config.getId(), window.getStart(), window.getEnd(), e);
}
@@ -190,8 +205,11 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
monitoringWindows.add(new Interval(startTime, endTime, dateTimeZone));
}
for (Interval window : monitoringWindows){
- LOG.info("running detections in windows {}", window);
+ LOG.info("Will run detection in window {}", window);
}
+ // pre cache the time series for the whole detection time period instead of fetching for each window
+ MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(), startTime - DEFAULT_CACHING_PERIOD_LOOKBACK, endTime, this.metricEntity.getFilters(), toTimeGranularity(this.bucketPeriod));
+ this.provider.cacheTimeseries(Collections.singleton(cacheSlice));
return monitoringWindows;
} catch (Exception e) {
LOG.info("can't generate moving monitoring windows, calling with single detection window", e);
@@ -208,9 +226,8 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
DateTime currentEndTime = new DateTime(getBoundaryAlignedTimeForDataset(new DateTime(endTime, dateTimeZone)), dateTimeZone);
DateTime lastDateTime = new DateTime(getBoundaryAlignedTimeForDataset(new DateTime(startTime, dateTimeZone)), dateTimeZone);
- Period bucketSizePeriod = getBucketSizePeriodForDataset();
while (lastDateTime.isBefore(currentEndTime)) {
- lastDateTime = lastDateTime.plus(bucketSizePeriod);
+ lastDateTime = lastDateTime.plus(this.bucketPeriod);
endTimes.add(lastDateTime.getMillis());
}
return endTimes;
@@ -271,4 +288,17 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
}
return bucketSizePeriod;
}
+
+ public static TimeGranularity toTimeGranularity(Period period) {
+ if (period.getDays() > 0) {
+ return new TimeGranularity(period.getDays(), TimeUnit.DAYS);
+ } else if (period.getHours() > 0) {
+ return new TimeGranularity(period.getHours(), TimeUnit.HOURS);
+ } else if (period.getMinutes() > 0) {
+ return new TimeGranularity(period.getMinutes(), TimeUnit.MINUTES);
+ } else {
+ return new TimeGranularity(period.getMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 00e0e5a..46095b3 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -161,6 +161,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
private static final String PROP_NAME = "name";
private static final String DEFAULT_TIMEZONE = "America/Los_Angeles";
private static final String DEFAULT_BASELINE_PROVIDER_YAML_TYPE = "RULE_BASELINE";
+ private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
private static final DetectionRegistry DETECTION_REGISTRY = DetectionRegistry.getInstance();
static {
@@ -176,7 +177,7 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
ImmutableSet.of("MIGRATED_ALGORITHM_FILTER", "MIGRATED_ALGORITHM", "MIGRATED_ALGORITHM_BASELINE");
private static final Map<String, String> DETECTOR_TO_BASELINE =
ImmutableMap.of("ALGORITHM", "ALGORITHM_BASELINE", "MIGRATED_ALGORITHM", "MIGRATED_ALGORITHM_BASELINE");
- private static final Set<String> MOVING_WINDOW_DETECTOR_TYPES = ImmutableSet.of("ALGORITHM");
+ private static final Set<String> MOVING_WINDOW_DETECTOR_TYPES = ImmutableSet.of("ALGORITHM", "MIGRATED_ALGORITHM");
private final Map<String, Object> components = new HashMap<>();
private MetricConfigDTO metricConfig;
@@ -321,6 +322,9 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl
if (yamlConfig.containsKey(PROP_TIMEZONE)){
properties.put(PROP_TIMEZONE, MapUtils.getString(yamlConfig, PROP_TIMEZONE));
}
+ if (yamlConfig.containsKey(PROP_BUCKET_PERIOD)){
+ properties.put(PROP_BUCKET_PERIOD, MapUtils.getString(yamlConfig, PROP_BUCKET_PERIOD));
+ }
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
index 5d508e9..377355b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionConfigTranslator.java
@@ -108,6 +108,5 @@ public abstract class YamlDetectionConfigTranslator {
*/
protected void validateYAML(Map<String, Object> yamlConfig) {
Preconditions.checkArgument(yamlConfig.containsKey(PROP_NAME), "Property missing " + PROP_NAME);
- Preconditions.checkArgument(yamlConfig.containsKey(PROP_DESC_NAME), "Property missing " + PROP_DESC_NAME);
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
index 0a8aba0..4c5f277 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
@@ -527,7 +527,6 @@ public class YamlResource {
responseMessage.put("message", "Failed to run the preview due to " + e.getMessage());
return Response.serverError().entity(responseMessage).build();
}
-
LOG.info("Preview successful using payload " + payload);
return Response.ok(result).build();
}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
index f3794f8..f1f8265 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
@@ -304,6 +304,21 @@ public class DataProviderTest {
Assert.assertTrue(anomalies.contains(makeAnomaly(this.anomalyIds.get(3), 200L, 14400000L, 18000000L, Arrays.asList("a=1", "c=3"))));
}
+ // cache
+
+ @Test
+ public void testTimeseriesCache(){
+ MetricSlice slice = MetricSlice.from(this.metricIds.get(0), 604800000L, 1814400000L);
+ this.provider.cacheTimeseries(Collections.singleton(slice));
+
+ DataFrame df = this.provider.fetchTimeseries(Collections.singleton(slice)).get(slice);
+
+ Assert.assertEquals(df.size(), 336);
+
+ double mean = df.getDoubles(COL_VALUE).mean().doubleValue();
+ Assert.assertTrue(Math.abs(mean - 1000) < EPSILON_MEAN);
+ }
+
//
// utils
//
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
index fcea804..523b861 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/RuleBaselineProviderTest.java
@@ -78,7 +78,7 @@ public class RuleBaselineProviderTest {
@Test
public void testFetchBaselineAggregates() {
Assert.assertEquals(
- this.baselineProvider.computePredictedAggregates(slice1, DoubleSeries.MEAN), 100.0);
+ this.baselineProvider.computePredictedAggregates(slice1, DoubleSeries.MEAN), 150.0);
}
@Test
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
index ad036fc..87f7698 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/BaselineFillingMergeWrapperTest.java
@@ -40,6 +40,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.thirdeye.detection.spi.model.TimeSeries;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -104,13 +105,15 @@ public class BaselineFillingMergeWrapperTest {
Map<MetricSlice, DataFrame> aggregates = new HashMap<>();
aggregates.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(-1, 100).build());
+ Map<MetricSlice, DataFrame> timeseries = new HashMap<>();
+ timeseries.put(MetricSlice.from(1, 3000, 3600), DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build());
MetricConfigDTO metric = new MetricConfigDTO();
metric.setId(1L);
metric.setDefaultAggFunction(MetricAggFunction.SUM);
DataProvider
provider = new MockDataProvider().setLoader(new MockPipelineLoader(this.runs, Collections.<MockPipelineOutput>emptyList())).setAnomalies(Collections.singletonList(anomaly)).setAggregates(aggregates)
- .setMetrics(Collections.singletonList(metric));
+ .setMetrics(Collections.singletonList(metric)).setTimeseries(timeseries);
this.config.getProperties().put(PROP_MAX_GAP, 100);
this.config.getProperties().put(PROP_BASELINE_PROVIDER, "$baseline");
@@ -118,6 +121,7 @@ public class BaselineFillingMergeWrapperTest {
BaselineProvider baselineProvider = new MockBaselineProvider();
MockBaselineProviderSpec spec = new MockBaselineProviderSpec();
spec.setAggregates(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), 100.0));
+ spec.setBaselineTimeseries(ImmutableMap.of(MetricSlice.from(1, 3000, 3600), TimeSeries.fromDataFrame(DataFrame.builder(COL_TIME + ":LONG", COL_VALUE + ":DOUBLE").append(3000, 100).build())));
InputDataFetcher dataFetcher = new DefaultInputDataFetcher(provider, this.config.getId());
baselineProvider.init(spec, dataFetcher);
this.config.setComponents(ImmutableMap.of("baseline", baselineProvider));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org