You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xh...@apache.org on 2020/05/21 05:47:57 UTC
[incubator-pinot] 02/02: add top n support for scv and sql data
source
This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch pushdown_topk_filter
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 5c802425d8abfa064c4c0557f214d5dce05466a9
Author: Xiaohui Sun <xh...@xhsun-mn1.linkedin.biz>
AuthorDate: Mon May 11 16:10:50 2020 -0700
add top n support for scv and sql data source
---
.../pinot/thirdeye/datasource/sql/SqlUtils.java | 22 ++--
.../pinot/thirdeye/detection/DataProvider.java | 3 +-
.../detection/algorithm/DimensionWrapper.java | 9 +-
.../thirdeye/datasource/sql/TestSqlUtils.java | 121 +++++++++++++++++++++
.../pinot/thirdeye/detection/DataProviderTest.java | 2 +-
.../pinot/thirdeye/detection/MockDataProvider.java | 10 +-
6 files changed, 153 insertions(+), 14 deletions(-)
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/sql/SqlUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/sql/SqlUtils.java
index 28f6272..f8b98a2 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/sql/SqlUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/sql/SqlUtils.java
@@ -258,16 +258,17 @@ public class SqlUtils {
sb.append(" AND ").append(dimensionWhereClause);
}
- if (limit <= 0) {
- limit = DEFAULT_LIMIT;
- }
-
String groupByClause = getDimensionGroupByClause(groupBy, timeGranularity, dataTimeSpec);
if (StringUtils.isNotBlank(groupByClause)) {
sb.append(" ").append(groupByClause);
- sb.append(" LIMIT " + limit);
}
+ if (limit > 0 ){
+ sb.append(" ORDER BY " + getSelectMetricClause(metricConfig, metricFunction) + " DESC");
+ }
+
+ limit = limit > 0 ? limit : DEFAULT_LIMIT;
+ sb.append(" LIMIT " + limit);
return sb.toString();
}
@@ -290,12 +291,20 @@ public class SqlUtils {
} else { //timeFormat case
builder.append(dateTimeSpec.getColumnName()).append(", ");
}
- }
+ }
for (String groupByKey: groupByKeys) {
builder.append(groupByKey).append(", ");
}
+ String selectMetricClause = getSelectMetricClause(metricConfig, metricFunction);
+ builder.append(selectMetricClause);
+
+ return builder.toString();
+ }
+
+ private static String getSelectMetricClause(MetricConfigDTO metricConfig, MetricFunction metricFunction) {
+ StringBuilder builder = new StringBuilder();
String metricName = null;
if (metricFunction.getMetricName().equals("*")) {
metricName = "*";
@@ -303,7 +312,6 @@ public class SqlUtils {
metricName = metricConfig.getName();
}
builder.append(convertAggFunction(metricFunction.getFunctionName())).append("(").append(metricName).append(")");
-
return builder.toString();
}
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
index 4bd0ab3..3ad473c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DataProvider.java
@@ -69,7 +69,8 @@ public interface DataProvider {
*
* @param slices metric slices
* @param dimensions dimensions to group by
- * @param limit max number of records to return. No limitation if it is a non-positive number.
+ * @param limit max number of records to return ordered by metric value
+ * no limitation if it is a non-positive number
* @return map of aggregation values (keyed by slice)
*/
Map<MetricSlice, DataFrame> fetchAggregates(Collection<MetricSlice> slices, List<String> dimensions, int limit);
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
index 757f4f8..cc252e8 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/algorithm/DimensionWrapper.java
@@ -170,8 +170,13 @@ public class DimensionWrapper extends DetectionPipeline {
MetricEntity metric = MetricEntity.fromURN(this.metricUrn);
MetricSlice slice = MetricSlice.from(metric.getId(), this.start.getMillis(), this.end.getMillis(), metric.getFilters());
- // Here we only pull the top k records, this is safe since the result is sorted by default in Pinot
- DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions, this.k).get(slice);
+ // We can push down the top k filter if min contribution is not defined.
+ // Otherwise it is not accurate to calculate the contribution.
+ int limit = -1;
+ if (Double.isNaN(this.minContribution) && this.k > 0) {
+ limit = this.k;
+ }
+ DataFrame aggregates = this.provider.fetchAggregates(Collections.singletonList(slice), this.dimensions, limit).get(slice);
if (aggregates.isEmpty()) {
return nestedMetrics;
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/sql/TestSqlUtils.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/sql/TestSqlUtils.java
new file mode 100644
index 0000000..2106254
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datasource/sql/TestSqlUtils.java
@@ -0,0 +1,121 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pinot.thirdeye.datasource.sql;
+
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.HashMultimap;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.thirdeye.common.time.TimeGranularity;
+import org.apache.pinot.thirdeye.common.time.TimeSpec;
+import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.MetricConfigDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.MetricFunction;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeRequest;
+import org.apache.pinot.thirdeye.datasource.cache.MetricDataset;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+public class TestSqlUtils {
+
+ private final String dataset = "mysql.db.table";
+ private final String metric = "metric";
+
+ private MetricDataset metricDataset;
+ private MetricFunction metricFunction;
+ private DAOTestBase daoTestBase;
+
+ @BeforeMethod
+ public void beforeMethod() throws Exception {
+ this.daoTestBase = DAOTestBase.getInstance();
+ this.metricDataset = new MetricDataset(metric, dataset);
+
+ LoadingCache<String, DatasetConfigDTO> mockDatasetConfigCache = Mockito.mock(LoadingCache.class);
+ Mockito.when(mockDatasetConfigCache.get(this.dataset)).thenReturn(new DatasetConfigDTO());
+
+ LoadingCache<MetricDataset, MetricConfigDTO> mockMetricConfigCache = Mockito.mock(LoadingCache.class);
+ Mockito.when(mockMetricConfigCache.get(this.metricDataset)).thenReturn(new MetricConfigDTO());
+
+ ThirdEyeCacheRegistry.getInstance().registerDatasetConfigCache(mockDatasetConfigCache);
+ ThirdEyeCacheRegistry.getInstance().registerMetricConfigCache(mockMetricConfigCache);
+
+ MetricConfigDTO metricConfigDTO = new MetricConfigDTO();
+ metricConfigDTO.setDataset(this.dataset);
+ metricConfigDTO.setName(this.metricDataset.getMetricName());
+ metricConfigDTO.setAlias(this.metricDataset.getDataset() + "::" + this.metricDataset.getMetricName());
+
+ metricFunction = new MetricFunction();
+ metricFunction.setDataset(dataset);
+ metricFunction.setMetricId(1L);
+ metricFunction.setMetricName(metric);
+ metricFunction.setFunctionName(MetricAggFunction.SUM);
+
+ DAORegistry.getInstance().getMetricConfigDAO().save(metricConfigDTO);
+ }
+
+ @AfterMethod
+ public void afterMethod() {
+ try { this.daoTestBase.cleanup(); } catch (Exception ignore) {}
+ }
+
+ @Test
+ public void testSqlWithExplicitLimit() {
+ TimeGranularity timeGranularity = new TimeGranularity(1, TimeUnit.DAYS);
+
+ ThirdEyeRequest request = ThirdEyeRequest.newBuilder()
+ .setDataSource(this.dataset)
+ .setLimit(100)
+ .setGroupBy("country")
+ .setStartTimeInclusive(DateTime.parse("2020-05-01"))
+ .setEndTimeExclusive(DateTime.parse("2020-05-01"))
+ .setGroupByTimeGranularity(timeGranularity)
+ .build("");
+
+ String timeFormat = TimeSpec.SINCE_EPOCH_FORMAT;
+ TimeSpec timeSpec = new TimeSpec("date", timeGranularity, timeFormat);
+ String actualSql = SqlUtils.getSql(request, this.metricFunction, HashMultimap.create(), timeSpec, this.dataset);
+ String expected = "SELECT date, country, SUM(metric) FROM table WHERE date = 18384 GROUP BY date, country ORDER BY SUM(metric) DESC LIMIT 100";
+ Assert.assertEquals(actualSql, expected);
+ }
+
+ @Test
+ public void testSqlWithoutExplicitLimit() {
+ TimeGranularity timeGranularity = new TimeGranularity(1, TimeUnit.DAYS);
+
+ ThirdEyeRequest request = ThirdEyeRequest.newBuilder()
+ .setDataSource(this.dataset)
+ .setGroupBy("country")
+ .setStartTimeInclusive(DateTime.parse("2020-05-01"))
+ .setEndTimeExclusive(DateTime.parse("2020-05-01"))
+ .setGroupByTimeGranularity(timeGranularity)
+ .build("");
+
+ String timeFormat = TimeSpec.SINCE_EPOCH_FORMAT;
+ TimeSpec timeSpec = new TimeSpec("date", timeGranularity, timeFormat);
+ String actual = SqlUtils.getSql(request, this.metricFunction, HashMultimap.create(), timeSpec, this.dataset);
+ String expected = "SELECT date, country, SUM(metric) FROM table WHERE date = 18384 GROUP BY date, country LIMIT 100000";
+ Assert.assertEquals(actual, expected);
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
index 26668e1..51533f7 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DataProviderTest.java
@@ -231,7 +231,7 @@ public class DataProviderTest {
@Test
public void testFetchAggregation() {
MetricSlice metricSlice = MetricSlice.from(this.metricIds.get(1), 0L, 32400000L, ArrayListMultimap.create());
- Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singletonList(metricSlice), Collections.emptyList(), -1);
+ Map<MetricSlice, DataFrame> aggregates = this.provider.fetchAggregates(Collections.singletonList(metricSlice), Collections.emptyList(), 1);
Assert.assertEquals(aggregates.keySet().size(), 1);
}
diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
index 74fa54d..0b8f625 100644
--- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
+++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/MockDataProvider.java
@@ -124,9 +124,13 @@ public class MockDataProvider implements DataProvider {
result.put(slice, this.aggregates.get(slice.withFilters(NO_FILTERS)));
} else {
- result.put(slice, this.aggregates.get(slice.withFilters(NO_FILTERS))
- .groupByValue(new ArrayList<>(dimensions)).aggregate(expr)
- .dropSeries(COL_KEY).setIndex(dimensions));
+ DataFrame aggResult = this.aggregates.get(slice.withFilters(NO_FILTERS))
+ .groupByValue(new ArrayList<>(dimensions)).aggregate(expr);
+
+ if (limit > 0) {
+ aggResult = aggResult.sortedBy(COL_VALUE).reverse().head(limit);
+ }
+ result.put(slice, aggResult.dropSeries(COL_KEY).setIndex(dimensions));
}
}
return result;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org