You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2019/02/20 22:01:42 UTC
[incubator-pinot] branch master updated: [TE] detection - time
series provider loading cache (#3842)
This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0d981ab [TE] detection - time series provider loading cache (#3842)
0d981ab is described below
commit 0d981ab6f4ebaee68d8ff47ac165f875ce7e78f3
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Wed Feb 20 14:01:37 2019 -0800
[TE] detection - time series provider loading cache (#3842)
* Set the maximum weight of the loading cache
* Improve onboarding performance
---
.../pinot/thirdeye/detection/DataProvider.java | 8 --
.../thirdeye/detection/DefaultDataProvider.java | 62 ++++++----
.../detection/wrapper/AnomalyDetectorWrapper.java | 6 +-
.../thirdeye/detection/yaml/YamlResource.java | 3 +-
.../pinot/thirdeye/detection/DataProviderTest.java | 82 ++-----------
.../components/ThresholdRuleDetectorTest.java | 34 +++---
.../finetune/GridSearchTuningAlgorithmTest.java | 134 ---------------------
.../wrapper/AnomalyDetectorWrapperTest.java | 11 +-
8 files changed, 86 insertions(+), 254 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
index 9175cb9..d51187b 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
@@ -55,14 +55,6 @@ public interface DataProvider {
Map<MetricSlice, DataFrame> fetchTimeseries(Collection<MetricSlice> slices);
/**
- * Caches the time series for the metric slices for later use
- * @param slices
- */
- default void cacheTimeseries(Collection<MetricSlice> slices){
- // do nothing if not implemented
- }
-
- /**
* Returns a map of aggregation values (keyed by slice) for a given set of slices,
* grouped by the given dimensions.
* The format of the DataFrame follows the standard convention of DataFrameUtils.
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
index 0b22629..04d7ae9 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
@@ -22,8 +22,10 @@ package org.apache.pinot.thirdeye.detection;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.cache.Weigher;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import java.util.ArrayList;
import java.util.Collection;
@@ -72,7 +74,8 @@ public class DefaultDataProvider implements DataProvider {
private final TimeSeriesLoader timeseriesLoader;
private final AggregationLoader aggregationLoader;
private final DetectionPipelineLoader loader;
- private final LoadingCache<MetricSlice, DataFrame> timeseriesCache;
+ private static LoadingCache<MetricSlice, DataFrame> DETECTION_TIME_SERIES_CACHE;
+
public DefaultDataProvider(MetricConfigManager metricDAO, DatasetConfigManager datasetDAO, EventManager eventDAO,
MergedAnomalyResultManager anomalyDAO, TimeSeriesLoader timeseriesLoader, AggregationLoader aggregationLoader,
@@ -84,27 +87,40 @@ public class DefaultDataProvider implements DataProvider {
this.timeseriesLoader = timeseriesLoader;
this.aggregationLoader = aggregationLoader;
this.loader = loader;
- this.timeseriesCache = CacheBuilder.newBuilder()
- .maximumSize(100)
- .expireAfterWrite(30, TimeUnit.MINUTES)
- .build(new CacheLoader<MetricSlice, DataFrame>() {
- @Override
- public DataFrame load(MetricSlice slice) {
- return fetchTimeseries(Collections.singleton(slice)).get(slice);
- }
- });
- }
+ if (DETECTION_TIME_SERIES_CACHE == null) {
+ // don't use more than one third of memory for detection time series
+ long cacheSize = Runtime.getRuntime().freeMemory() / 3;
+ LOG.info("initializing detection timeseries cache with {} bytes", cacheSize);
+ DETECTION_TIME_SERIES_CACHE = CacheBuilder.newBuilder()
+ .maximumWeight(cacheSize)
+ // Estimate that most detection tasks will complete within 15 minutes
+ .expireAfterWrite(15, TimeUnit.MINUTES)
+ .weigher((Weigher<MetricSlice, DataFrame>) (slice, dataFrame) -> dataFrame.size() * (Long.BYTES + Double.BYTES))
+ .build(new CacheLoader<MetricSlice, DataFrame>() {
+ // load single slice
+ @Override
+ public DataFrame load(MetricSlice slice) {
+ return loadTimeseries(Collections.singleton(slice)).get(slice);
+ }
+
+ // buck loading time series slice in parallel
+ @Override
+ public Map<MetricSlice, DataFrame> loadAll(Iterable<? extends MetricSlice> slices) {
+ return loadTimeseries(Lists.newArrayList(slices));
+ }
+ });
+ }
+ }
- @Override
- public Map<MetricSlice, DataFrame> fetchTimeseries(Collection<MetricSlice> slices) {
+ private Map<MetricSlice, DataFrame> loadTimeseries(Collection<MetricSlice> slices) {
try {
long ts = System.currentTimeMillis();
Map<MetricSlice, DataFrame> output = new HashMap<>();
- // if already in cache, return directly
+ // if the time series slice is already in cache, return directly
for (MetricSlice slice : slices){
- for (Map.Entry<MetricSlice, DataFrame> entry : this.timeseriesCache.asMap().entrySet()) {
+ for (Map.Entry<MetricSlice, DataFrame> entry : DETECTION_TIME_SERIES_CACHE.asMap().entrySet()) {
if (entry.getKey().containSlice(slice)){
DataFrame df = entry.getValue().filter(entry.getValue().getLongs(COL_TIME).between(slice.getStart(), slice.getEnd())).dropNull(COL_TIME);
output.put(slice, df);
@@ -136,13 +152,17 @@ public class DefaultDataProvider implements DataProvider {
}
@Override
- public void cacheTimeseries(Collection<MetricSlice> slices) {
- for (MetricSlice slice : slices){
- try {
- this.timeseriesCache.get(slice);
- } catch (Exception e){
- LOG.warn("cache time series for slice {} failed", slice.toString());
+ public Map<MetricSlice, DataFrame> fetchTimeseries(Collection<MetricSlice> slices) {
+ try {
+ Map<MetricSlice, DataFrame> cacheResult = DETECTION_TIME_SERIES_CACHE.getAll(slices);
+ Map<MetricSlice, DataFrame> timeseriesResult = new HashMap<>();
+ for (Map.Entry<MetricSlice, DataFrame> entry : cacheResult.entrySet()){
+ // make a copy of the result so that cache won't be contaminated by client code
+ timeseriesResult.put(entry.getKey(), entry.getValue().copy());
}
+ return timeseriesResult;
+ } catch (Exception e) {
+ throw new RuntimeException("fetch time series failed", e);
}
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
index 299f231..5e7227f 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapper.java
@@ -72,7 +72,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
private static final String PROP_DETECTOR_COMPONENT_NAME = "detectorComponentName";
private static final String PROP_TIMEZONE = "timezone";
private static final String PROP_BUCKET_PERIOD = "bucketPeriod";
- private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(60);
+ private static final long DEFAULT_CACHING_PERIOD_LOOKBACK = TimeUnit.DAYS.toMillis(90);
private static final Logger LOG = LoggerFactory.getLogger(
AnomalyDetectorWrapper.class);
@@ -139,7 +139,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
// 2. to calculate current values and baseline values for the anomalies detected
// 3. anomaly detection current and baseline time series value
MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(), startTime - DEFAULT_CACHING_PERIOD_LOOKBACK, endTime, this.metricEntity.getFilters());
- this.provider.cacheTimeseries(Collections.singleton(cacheSlice));
+ this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
List<Interval> monitoringWindows = this.getMonitoringWindows();
List<MergedAnomalyResultDTO> anomalies = new ArrayList<>();
@@ -209,7 +209,7 @@ public class AnomalyDetectorWrapper extends DetectionPipeline {
}
// pre cache the time series for the whole detection time period instead of fetching for each window
MetricSlice cacheSlice = MetricSlice.from(this.metricEntity.getId(), startTime - DEFAULT_CACHING_PERIOD_LOOKBACK, endTime, this.metricEntity.getFilters(), toTimeGranularity(this.bucketPeriod));
- this.provider.cacheTimeseries(Collections.singleton(cacheSlice));
+ this.provider.fetchTimeseries(Collections.singleton(cacheSlice));
return monitoringWindows;
} catch (Exception e) {
LOG.info("can't generate moving monitoring windows, calling with single detection window", e);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
index 63c741d..28adeb5 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlResource.java
@@ -542,6 +542,7 @@ public class YamlResource {
@ApiParam("jsonPayload") String payload) {
Map<String, String> responseMessage = new HashMap<>();
DetectionPipelineResult result;
+ long ts = System.currentTimeMillis();
try {
Preconditions.checkArgument(StringUtils.isNotBlank(payload), "The Yaml Payload in the request is empty.");
@@ -563,7 +564,7 @@ public class YamlResource {
responseMessage.put("message", "Failed to run the preview due to " + e.getMessage());
return Response.serverError().entity(responseMessage).build();
}
- LOG.info("Preview successful using payload " + payload);
+ LOG.info("Preview successful, used {} milliseconds", System.currentTimeMillis() - ts);
return Response.ok(result).build();
}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
index f1f8265..e12c4ba 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
@@ -18,9 +18,19 @@ package org.apache.pinot.thirdeye.detection;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
@@ -39,19 +49,8 @@ import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -145,52 +144,12 @@ public class DataProviderTest {
this.timeseriesLoader, null, null);
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
public void afterMethod() {
this.testBase.cleanup();
}
//
- // timeseries
- //
-
- @Test
- public void testTimeseriesSingle() {
- MetricSlice slice = MetricSlice.from(this.metricIds.get(0), 604800000L, 1814400000L);
-
- DataFrame df = this.provider.fetchTimeseries(Collections.singleton(slice)).get(slice);
-
- Assert.assertEquals(df.size(), 336);
-
- double mean = df.getDoubles(COL_VALUE).mean().doubleValue();
- Assert.assertTrue(Math.abs(mean - 1000) < EPSILON_MEAN);
- }
-
- @Test
- public void testTimeseriesMultiple() {
- MetricSlice slice1 = MetricSlice.from(this.metricIds.get(0), 604800000L, 1814400000L);
- MetricSlice slice2 = MetricSlice.from(this.metricIds.get(1), 604800000L, 1209600000L);
-
- Map<MetricSlice, DataFrame> output = this.provider.fetchTimeseries(Arrays.asList(slice1, slice2));
-
- Assert.assertEquals(output.size(), 2);
-
- double mean1 = output.get(slice1).getDoubles(COL_VALUE).mean().doubleValue();
- Assert.assertTrue(Math.abs(mean1 - 1000) < EPSILON_MEAN);
-
- double mean2 = output.get(slice2).getDoubles(COL_VALUE).mean().doubleValue();
- Assert.assertTrue(Math.abs(mean2 - 1000) < EPSILON_MEAN);
-
- Assert.assertNotEquals(mean1, mean2);
- }
-
- //
- // aggregates
- //
-
- // TODO fetch aggregates tests
-
- //
// metric
//
@@ -304,21 +263,6 @@ public class DataProviderTest {
Assert.assertTrue(anomalies.contains(makeAnomaly(this.anomalyIds.get(3), 200L, 14400000L, 18000000L, Arrays.asList("a=1", "c=3"))));
}
- // cache
-
- @Test
- public void testTimeseriesCache(){
- MetricSlice slice = MetricSlice.from(this.metricIds.get(0), 604800000L, 1814400000L);
- this.provider.cacheTimeseries(Collections.singleton(slice));
-
- DataFrame df = this.provider.fetchTimeseries(Collections.singleton(slice)).get(slice);
-
- Assert.assertEquals(df.size(), 336);
-
- double mean = df.getDoubles(COL_VALUE).mean().doubleValue();
- Assert.assertTrue(Math.abs(mean - 1000) < EPSILON_MEAN);
- }
-
//
// utils
//
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleDetectorTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleDetectorTest.java
index 102bfac..1b1a267 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleDetectorTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/ThresholdRuleDetectorTest.java
@@ -16,6 +16,11 @@
package org.apache.pinot.thirdeye.detection.components;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
@@ -23,15 +28,11 @@ import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.detection.DataProvider;
-import org.apache.pinot.thirdeye.detection.DetectionPipeline;
-import org.apache.pinot.thirdeye.detection.DetectionPipelineResult;
+import org.apache.pinot.thirdeye.detection.DefaultInputDataFetcher;
import org.apache.pinot.thirdeye.detection.MockDataProvider;
-import org.apache.pinot.thirdeye.detection.wrapper.AnomalyDetectorWrapper;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import org.apache.pinot.thirdeye.detection.spec.ThresholdRuleDetectorSpec;
+import org.apache.pinot.thirdeye.detection.spi.components.AnomalyDetector;
+import org.joda.time.Interval;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -41,10 +42,9 @@ import static org.apache.pinot.thirdeye.dataframe.util.DataFrameUtils.*;
public class ThresholdRuleDetectorTest {
private DataProvider testDataProvider;
- private DetectionPipeline detectionPipeline;
@BeforeMethod
- public void beforeMethod() throws Exception {
+ public void beforeMethod() {
Map<MetricSlice, DataFrame> timeSeries = new HashMap<>();
timeSeries.put(MetricSlice.from(123L, 0, 10),
new DataFrame().addSeries(COL_VALUE, 0, 100, 200, 500, 1000).addSeries(COL_TIME, 0, 2, 4, 6, 8));
@@ -64,8 +64,6 @@ public class ThresholdRuleDetectorTest {
DetectionConfigDTO detectionConfigDTO = new DetectionConfigDTO();
detectionConfigDTO.setId(125L);
Map<String, Object> detectorSpecs = new HashMap<>();
- detectorSpecs.put("min", 100);
- detectorSpecs.put("max", 500);
detectorSpecs.put("className", ThresholdRuleDetector.class.getName());
Map<String, Object> properties = new HashMap<>();
properties.put("metricUrn", "thirdeye:metric:123");
@@ -79,14 +77,16 @@ public class ThresholdRuleDetectorTest {
.setMetrics(Collections.singletonList(metricConfigDTO))
.setDatasets(Collections.singletonList(datasetConfigDTO))
.setTimeseries(timeSeries);
- this.detectionPipeline = new AnomalyDetectorWrapper(this.testDataProvider, detectionConfigDTO, 0, 10);
}
@Test
- public void testThresholdAlgorithmRun() throws Exception {
- DetectionPipelineResult result = this.detectionPipeline.run();
- List<MergedAnomalyResultDTO> anomalies = result.getAnomalies();
- Assert.assertEquals(result.getLastTimestamp(), 10);
+ public void testThresholdAlgorithmRun() {
+ AnomalyDetector thresholdAlgorithm = new ThresholdRuleDetector();
+ ThresholdRuleDetectorSpec spec = new ThresholdRuleDetectorSpec();
+ spec.setMin(100);
+ spec.setMax(500);
+ thresholdAlgorithm.init(spec, new DefaultInputDataFetcher(testDataProvider, -1));
+ List<MergedAnomalyResultDTO> anomalies = thresholdAlgorithm.runDetection(new Interval(0, 10), "thirdeye:metric:123");
Assert.assertEquals(anomalies.size(), 2);
Assert.assertEquals(anomalies.get(0).getStartTime(), 0);
Assert.assertEquals(anomalies.get(0).getEndTime(), 2);
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/finetune/GridSearchTuningAlgorithmTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/finetune/GridSearchTuningAlgorithmTest.java
deleted file mode 100644
index 5e3ccc4..0000000
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/finetune/GridSearchTuningAlgorithmTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pinot.thirdeye.detection.finetune;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.pinot.thirdeye.anomalydetection.context.AnomalyFeedback;
-import org.apache.pinot.thirdeye.constant.AnomalyFeedbackType;
-import org.apache.pinot.thirdeye.dataframe.DataFrame;
-import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
-import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
-import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
-import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
-import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
-import org.apache.pinot.thirdeye.datasource.DAORegistry;
-import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
-import org.apache.pinot.thirdeye.datasource.ThirdEyeDataSource;
-import org.apache.pinot.thirdeye.datasource.cache.QueryCache;
-import org.apache.pinot.thirdeye.datasource.csv.CSVThirdEyeDataSource;
-import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.collections.MapUtils;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-public class GridSearchTuningAlgorithmTest {
- private DAOTestBase testDAOProvider;
-
- private TuningAlgorithm gridSearch;
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- @BeforeMethod
- public void beforeMethod() throws JsonProcessingException {
- testDAOProvider = DAOTestBase.getInstance();
-
- // metric set up
- MetricConfigManager metricDAO = DAORegistry.getInstance().getMetricConfigDAO();
- MetricConfigDTO metric = new MetricConfigDTO();
- metric.setName("test");
- metric.setDataset("testDataSet");
- metric.setAlias("alias");
- metricDAO.save(metric);
-
- // data set set up
- DatasetConfigManager datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO();
- DatasetConfigDTO datasetDTO = new DatasetConfigDTO();
- datasetDTO.setDataset("testDataSet");
- datasetDTO.setDataSource("myDataSource");
- datasetDTO.setNonAdditiveBucketSize(1);
- datasetDTO.setTimeUnit(TimeUnit.MINUTES);
- datasetDAO.save(datasetDTO);
-
- // datasource set up
- DataFrame data = new DataFrame();
- data.addSeries("timestamp", 1526414678000L, 1527019478000L);
- data.addSeries("value", 100, 200);
- Map<String, DataFrame> datasets = new HashMap<>();
- datasets.put("testDataSet", data);
-
- Map<Long, String> id2name = new HashMap<>();
- id2name.put(1L, "value");
-
- Map<String, ThirdEyeDataSource> dataSourceMap = new HashMap<>();
-
- dataSourceMap.put("myDataSource", CSVThirdEyeDataSource.fromDataFrame(datasets, id2name));
- QueryCache cache = new QueryCache(dataSourceMap, Executors.newSingleThreadExecutor());
- ThirdEyeCacheRegistry.getInstance().registerQueryCache(cache);
- ThirdEyeCacheRegistry.initMetaDataCaches();
-
- // existing anomaly set up
- MergedAnomalyResultManager anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
- MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
- anomaly.setStartTime(1525241940000L);
- anomaly.setEndTime(1525241940001L);
- AnomalyFeedback feedback = new AnomalyFeedbackDTO();
- feedback.setFeedbackType(AnomalyFeedbackType.ANOMALY);
- anomaly.setFeedback(feedback);
- anomalyDAO.save(anomaly);
-
- // properties
- LinkedHashMap<String, Object> properties = new LinkedHashMap<>();
- properties.put("metricUrn", "thirdeye:metric:1");
- properties.put("className", "org.apache.pinot.thirdeye.detection.algorithm.BaselineAlgorithm");
- properties.put("change", 0.1);
-
- // parameters
- LinkedHashMap<String, List<Number>> parameters = new LinkedHashMap<>();
- parameters.put("$.change", Arrays.<Number>asList(0.05, 0.1));
-
- gridSearch = new GridSearchTuningAlgorithm(OBJECT_MAPPER.writeValueAsString(properties), parameters);
- }
-
- @AfterClass(alwaysRun = true)
- void afterClass() {
- testDAOProvider.cleanup();
- }
-
- @Test
- public void testGridSearch() throws Exception {
- AnomalySlice slice = new AnomalySlice().withStart(1525211842000L).withEnd(1527890242000L);
- gridSearch.fit(slice, -1);
- DetectionConfigDTO config = gridSearch.bestDetectionConfig();
- Assert.assertEquals(MapUtils.getDouble(config.getProperties(), "change"), 0.05);
- }
-
- // TODO test dimension separation
-}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
index 0703f69..69389f1 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/wrapper/AnomalyDetectorWrapperTest.java
@@ -16,7 +16,10 @@
package org.apache.pinot.thirdeye.detection.wrapper;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ListMultimap;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
import org.apache.pinot.thirdeye.common.time.TimeSpec;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
@@ -76,7 +79,13 @@ public class AnomalyDetectorWrapperTest {
MetricSlice.from(1L, 1546819200000L, 1546905600000L),
DataFrame.builder(COL_TIME, COL_VALUE).build(),
MetricSlice.from(1L, 1546300800000L, 1546560000000L),
- new DataFrame().addSeries(COL_VALUE, 500, 1000).addSeries(COL_TIME, 1546300800000L, 1546387200000L)));
+ new DataFrame().addSeries(COL_VALUE, 500, 1000).addSeries(COL_TIME, 1546300800000L, 1546387200000L),
+ MetricSlice.from(1L, 1540147725000L - TimeUnit.DAYS.toMillis(90), 1540493325000L, HashMultimap.create(),
+ new TimeGranularity(1, TimeUnit.DAYS)),
+ new DataFrame().addSeries(COL_VALUE, 500, 1000).addSeries(COL_TIME, 1546646400000L, 1546732800000L),
+ MetricSlice.from(1L, 1540080000000L - TimeUnit.DAYS.toMillis(90), 1540425600000L, HashMultimap.create(),
+ new TimeGranularity(1, TimeUnit.DAYS)),
+ new DataFrame().addSeries(COL_VALUE, 500, 1000).addSeries(COL_TIME, 1546646400000L, 1546732800000L)));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org