You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2020/05/06 21:04:18 UTC
[incubator-pinot] 01/01: [TE] Push down top k filter to data
provider
This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch pushdown_topk_filter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 91205ed8e341907b89a63a98f52d2d025b1d77c6
Author: Xiaohui Sun <xh...@xhsun-mn1.linkedin.biz>
AuthorDate: Wed May 6 14:03:55 2020 -0700
[TE] Push down top k filter to data provider
---
.../pinot/thirdeye/detection/DataProvider.java | 3 +-
.../thirdeye/detection/DefaultDataProvider.java | 10 +---
.../detection/DefaultInputDataFetcher.java | 2 +-
.../detection/StaticDetectionPipeline.java | 2 +-
.../algorithm/BaselineRuleFilterWrapper.java | 4 +-
.../detection/algorithm/DimensionWrapper.java | 3 +-
.../detection/algorithm/LegacyMergeWrapper.java | 2 +-
.../algorithm/ThresholdRuleFilterWrapper.java | 2 +-
.../pinot/thirdeye/detection/DataProviderTest.java | 64 ++++++++++++++++++++--
.../pinot/thirdeye/detection/MockDataProvider.java | 2 +-
10 files changed, 74 insertions(+), 20 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
index 5d1361d..4bd0ab3 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
@@ -69,9 +69,10 @@ public interface DataProvider {
*
* @param slices metric slices
* @param dimensions dimensions to group by
+ * @param limit max number of records to return. No limitation if it is a non-positive number.
* @return map of aggregation values (keyed by slice)
*/
- Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, List<String> dimensions);
+ Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, List<String> dimensions, int limit);
/**
* Returns a multimap of anomalies (keyed by slice) for a given set of slices.
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
index d24fbe3..4d82342 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultDataProvider.java
@@ -118,16 +118,12 @@ public class DefaultDataProvider implements DataProvider {
}
@Override
- public Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, final List<String> dimensions) {
+ public Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, final List<String> dimensions, int limit) {
try {
Map<MetricSlice, Future<DataFrame>> futures = new HashMap<>();
for (final MetricSlice slice : slices) {
- futures.put(slice, this.executor.submit(new Callable<DataFrame>() {
- @Override
- public DataFrame call() throws Exception {
- return DefaultDataProvider.this.aggregationLoader.loadAggregate(slice, dimensions, -1);
- }
- }));
+ futures.put(slice, this.executor.submit(
+ () -> DefaultDataProvider.this.aggregationLoader.loadAggregate(slice, dimensions, limit)));
}
final long deadline = System.currentTimeMillis() + TIMEOUT;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultInputDataFetcher.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultInputDataFetcher.java
index 2dd1efc..c5072a9 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultInputDataFetcher.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DefaultInputDataFetcher.java
@@ -57,7 +57,7 @@ public class DefaultInputDataFetcher implements InputDataFetcher {
*/
public InputData fetchData(InputDataSpec inputDataSpec) {
Map<MetricSlice, DataFrame> timeseries = provider.fetchTimeseries(inputDataSpec.getTimeseriesSlices());
- Map<MetricSlice, DataFrame> aggregates = provider.fetchAggregates(inputDataSpec.getAggregateSlices(), Collections.<String>emptyList());
+ Map<MetricSlice, DataFrame> aggregates = provider.fetchAggregates(inputDataSpec.getAggregateSlices(), Collections.<String>emptyList(), -1);
Collection<AnomalySlice> slicesWithConfigId = new HashSet<>();
for (AnomalySlice slice : inputDataSpec.getAnomalySlices()) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/StaticDetectionPipeline.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/StaticDetectionPipeline.java
index d068685..42dea97 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/StaticDetectionPipeline.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/StaticDetectionPipeline.java
@@ -81,7 +81,7 @@ public abstract class StaticDetectionPipeline extends DetectionPipeline {
public final DetectionPipelineResult run() throws Exception {
InputDataSpec dataSpec = this.getInputDataSpec();
Map<MetricSlice, DataFrame> timeseries = this.provider.fetchTimeseries(dataSpec.getTimeseriesSlices());
- Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(dataSpec.getAggregateSlices(), Collections.<String>emptyList());
+ Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(dataSpec.getAggregateSlices(), Collections.<String>emptyList(), -1);
Collection<AnomalySlice> updatedSlices = new HashSet<>();
for (AnomalySlice slice : dataSpec.getAnomalySlices()) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java
index 686bd30..ea43d58 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/BaselineRuleFilterWrapper.java
@@ -87,7 +87,7 @@ public class BaselineRuleFilterWrapper extends RuleBasedFilterWrapper {
MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
MetricSlice baselineSlice = this.baseline.scatter(currentSlice).get(0);
- Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Arrays.asList(currentSlice, baselineSlice), Collections.<String>emptyList());
+ Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Arrays.asList(currentSlice, baselineSlice), Collections.<String>emptyList(), -1);
double currentValue = getValueFromAggregates(currentSlice, aggregates);
double baselineValue = getValueFromAggregates(baselineSlice, aggregates);
if (!Double.isNaN(this.difference) && Math.abs(currentValue - baselineValue) < this.difference) {
@@ -102,7 +102,7 @@ public class BaselineRuleFilterWrapper extends RuleBasedFilterWrapper {
MetricSlice siteWideSlice = this.baseline.scatter(
MetricSlice.from(siteWideEntity.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters())).get(0);
double siteWideBaselineValue = getValueFromAggregates(siteWideSlice,
- this.provider.fetchAggregates(Collections.singleton(siteWideSlice), Collections.<String>emptyList()));
+ this.provider.fetchAggregates(Collections.singleton(siteWideSlice), Collections.<String>emptyList(), -1));
if (siteWideBaselineValue != 0 && (Math.abs(currentValue - baselineValue) / siteWideBaselineValue) < this.siteWideImpactThreshold) {
return false;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
index 8eeb7fb..757f4f8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
@@ -170,7 +170,8 @@ public class DimensionWrapper extends DetectionPipeline {
MetricEntity metric = MetricEntity.fromURN(this.metricUrn);
MetricSlice slice = MetricSlice.from(metric.getId(), this.start.getMillis(), this.end.getMillis(), metric.getFilters());
- DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions).get(slice);
+ // Here we only pull the top k records, this is safe since the result is sorted by default in Pinot
+ DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions, this.k).get(slice);
if (aggregates.isEmpty()) {
return nestedMetrics;
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/LegacyMergeWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/LegacyMergeWrapper.java
index 48473b7..7d3435e 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/LegacyMergeWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/LegacyMergeWrapper.java
@@ -256,7 +256,7 @@ public class LegacyMergeWrapper extends DetectionPipeline {
if (!StringUtils.isBlank(anomalyFunctionSpec.getGlobalMetric())) {
MetricSlice slice = makeGlobalSlice(anomalyFunctionSpec, mergedAnomalyResult);
- double valGlobal = this.provider.fetchAggregates(Collections.singleton(slice), Collections.<String>emptyList()).get(slice).getDouble(COL_VALUE, 0);
+ double valGlobal = this.provider.fetchAggregates(Collections.singleton(slice), Collections.<String>emptyList(), -1).get(slice).getDouble(COL_VALUE, 0);
double diffLocal = mergedAnomalyResult.getAvgCurrentVal() - mergedAnomalyResult.getAvgBaselineVal();
mergedAnomalyResult.setImpactToGlobal(diffLocal / valGlobal);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java
index dbc907a..71676de 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/ThresholdRuleFilterWrapper.java
@@ -56,7 +56,7 @@ public class ThresholdRuleFilterWrapper extends RuleBasedFilterWrapper {
MetricEntity me = MetricEntity.fromURN(anomaly.getMetricUrn());
MetricSlice currentSlice = MetricSlice.from(me.getId(), anomaly.getStartTime(), anomaly.getEndTime(), me.getFilters());
- Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singleton(currentSlice), Collections.<String>emptyList());
+ Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singleton(currentSlice), Collections.<String>emptyList(), -1);
double currentValue = getValueFromAggregates(currentSlice, aggregates);
if (!Double.isNaN(this.min) && currentValue < this.min) {
return false;
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
index b261c95..26668e1 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
@@ -17,6 +17,8 @@
package org.apache.pinot.thirdeye.detection;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import java.io.InputStreamReader;
@@ -29,9 +31,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.thirdeye.anomaly.AnomalyType;
import org.apache.pinot.thirdeye.dataframe.DataFrame;
+import org.apache.pinot.thirdeye.dataframe.util.MetricSlice;
import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
@@ -45,13 +49,20 @@ import org.apache.pinot.thirdeye.datalayer.dto.EventDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeDataSource;
+import org.apache.pinot.thirdeye.datasource.cache.MetricDataset;
import org.apache.pinot.thirdeye.datasource.cache.QueryCache;
+import org.apache.pinot.thirdeye.datasource.csv.CSVThirdEyeDataSource;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
import org.apache.pinot.thirdeye.detection.cache.builder.AnomaliesCacheBuilder;
import org.apache.pinot.thirdeye.detection.cache.builder.TimeSeriesCacheBuilder;
import org.apache.pinot.thirdeye.detection.spi.model.AnomalySlice;
import org.apache.pinot.thirdeye.detection.spi.model.EventSlice;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
@@ -69,8 +80,9 @@ public class DataProviderTest {
private DatasetConfigManager datasetDAO;
private EvaluationManager evaluationDAO;
private DetectionConfigManager detectionDAO;
- private QueryCache cache;
+ private QueryCache queryCache;
private TimeSeriesLoader timeseriesLoader;
+ private AggregationLoader aggregationLoader;
private DataFrame data;
@@ -82,6 +94,8 @@ public class DataProviderTest {
private List<Long> datasetIds;
private List<Long> detectionIds;
+ private static final MetricDataset METRIC = new MetricDataset("metric", "collection1");
+
@BeforeMethod
public void beforeMethod() throws Exception {
this.testBase = DAOTestBase.getInstance();
@@ -139,12 +153,47 @@ public class DataProviderTest {
this.data.addSeries(COL_TIME, this.data.getLongs(COL_TIME).multiply(1000));
}
- // loaders
- this.timeseriesLoader = new DefaultTimeSeriesLoader(this.metricDAO, this.datasetDAO, this.cache, null);
+ // register caches
+
+ LoadingCache<String, DatasetConfigDTO> mockDatasetConfigCache = Mockito.mock(LoadingCache.class);
+ DatasetConfigDTO datasetConfig = this.datasetDAO.findByDataset("myDataset2");
+ Mockito.when(mockDatasetConfigCache.get("myDataset2")).thenReturn(datasetConfig);
+
+
+ LoadingCache<String, Long> mockDatasetMaxDataTimeCache = Mockito.mock(LoadingCache.class);
+ Mockito.when(mockDatasetMaxDataTimeCache.get("myDataset2"))
+ .thenReturn(Long.MAX_VALUE);
+
+ MetricDataset metricDataset = new MetricDataset("myMetric2", "myDataset2");
+ LoadingCache<MetricDataset, MetricConfigDTO> mockMetricConfigCache = Mockito.mock(LoadingCache.class);
+ MetricConfigDTO metricConfig = this.metricDAO.findByMetricAndDataset("myMetric2", "myDataset2");
+ Mockito.when(mockMetricConfigCache.get(metricDataset)).thenReturn(metricConfig);
+
+ Map<String, DataFrame> datasets = new HashMap<>();
+ datasets.put("myDataset1", data);
+ datasets.put("myDataset2", data);
+
+ Map<Long, String> id2name = new HashMap<>();
+ id2name.put(this.metricIds.get(1), "value");
+ Map<String, ThirdEyeDataSource> dataSourceMap = new HashMap<>();
+ dataSourceMap.put("myDataSource", CSVThirdEyeDataSource.fromDataFrame(datasets, id2name));
+ this.queryCache = new QueryCache(dataSourceMap, Executors.newSingleThreadExecutor());
+
+ ThirdEyeCacheRegistry cacheRegistry = ThirdEyeCacheRegistry.getInstance();
+ cacheRegistry.registerMetricConfigCache(mockMetricConfigCache);
+ cacheRegistry.registerDatasetConfigCache(mockDatasetConfigCache);
+ cacheRegistry.registerQueryCache(this.queryCache);
+ cacheRegistry.registerDatasetMaxDataTimeCache(mockDatasetMaxDataTimeCache);
+
+ // time series loader
+ this.timeseriesLoader = new DefaultTimeSeriesLoader(this.metricDAO, this.datasetDAO, this.queryCache, null);
+
+ // aggregation loader
+ this.aggregationLoader = new DefaultAggregationLoader(this.metricDAO, this.datasetDAO, this.queryCache, mockDatasetMaxDataTimeCache);
// provider
this.provider = new DefaultDataProvider(this.metricDAO, this.datasetDAO, this.eventDAO, this.anomalyDAO,
- this.evaluationDAO, this.timeseriesLoader, null, null,
+ this.evaluationDAO, this.timeseriesLoader, aggregationLoader, null,
TimeSeriesCacheBuilder.getInstance(), AnomaliesCacheBuilder.getInstance());
}
@@ -179,6 +228,13 @@ public class DataProviderTest {
Assert.assertTrue(metrics.contains(makeMetric(this.metricIds.get(2), "myMetric3", "myDataset1")));
}
+ @Test
+ public void testFetchAggregation() {
+ MetricSlice metricSlice = MetricSlice.from(this.metricIds.get(1), 0L, 32400000L, ArrayListMultimap.create());
+ Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singletonList(metricSlice), Collections.emptyList(), -1);
+ Assert.assertEquals(aggregates.keySet().size(), 1);
+ }
+
//
// datasets
//
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
index 364c80c..74fa54d 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
@@ -111,7 +111,7 @@ public class MockDataProvider implements DataProvider {
}
@Override
- public Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, final List<String> dimensions) {
+ public Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, final List<String> dimensions, int limit) {
Map<MetricSlice, DataFrame> result = new HashMap<>();
for (MetricSlice slice : slices) {
List<String> expr = new ArrayList<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org