You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ji...@apache.org on 2021/01/29 18:17:29 UTC

[incubator-pinot] branch master updated: [TE] migrate PQL queries to standard SQL (#6486)

This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c30c76  [TE] migrate PQL queries to standard SQL (#6486)
6c30c76 is described below

commit 6c30c76b9779eca556d9813bf188cf23d42ae24b
Author: Jihao Zhang <ji...@linkedin.com>
AuthorDate: Fri Jan 29 10:17:08 2021 -0800

    [TE] migrate PQL queries to standard SQL (#6486)
    
    This PR changes the query to Pinot to standard SQL because Pinot is adopting standard SQL syntax and semantics for querying Pinot.
---
 .../thirdeye/datasource/pinot/PqlUtilsTest.java    | 26 ++++-----
 .../datasource/pinot/PinotDataSourceTimeQuery.java | 12 ++---
 .../datasource/pinot/PinotThirdEyeDataSource.java  | 12 ++---
 .../pinot/{PqlUtils.java => SqlUtils.java}         | 63 +++++++++++++---------
 .../pinot/resources/PinotDataSourceResource.java   |  2 +-
 5 files changed, 64 insertions(+), 51 deletions(-)

diff --git a/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/datasource/pinot/PqlUtilsTest.java b/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/datasource/pinot/PqlUtilsTest.java
index d7272b2..382bb4c 100644
--- a/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/datasource/pinot/PqlUtilsTest.java
+++ b/thirdeye/thirdeye-dashboard/src/test/java/org/apache/pinot/thirdeye/datasource/pinot/PqlUtilsTest.java
@@ -77,7 +77,7 @@ public class PqlUtilsTest {
 
   @Test(dataProvider = "betweenClauseArgs")
   public void getBetweenClause(DateTime start, DateTime end, TimeSpec timeSpec, String expected) throws ExecutionException {
-    String betweenClause = PqlUtils.getBetweenClause(start, end, timeSpec, "collection");
+    String betweenClause = SqlUtils.getBetweenClause(start, end, timeSpec, "collection");
     Assert.assertEquals(betweenClause, expected);
   }
 
@@ -138,7 +138,7 @@ public class PqlUtilsTest {
     dimensions.put("key7", "value71\'");
     dimensions.put("key7", "value72\"");
 
-    String output = PqlUtils.getDimensionWhereClause(dimensions);
+    String output = SqlUtils.getDimensionWhereClause(dimensions);
 
     Assert.assertEquals(output, ""
         + "key < \"value\" AND "
@@ -158,22 +158,22 @@ public class PqlUtilsTest {
 
   @Test
   public  void testQuote() {
-    Assert.assertEquals(PqlUtils.quote("123"), "123");
-    Assert.assertEquals(PqlUtils.quote("abc"), "\"abc\"");
-    Assert.assertEquals(PqlUtils.quote("123\'"), "\"123\'\"");
-    Assert.assertEquals(PqlUtils.quote("abc\""), "\'abc\"\'");
+    Assert.assertEquals(SqlUtils.quote("123"), "123");
+    Assert.assertEquals(SqlUtils.quote("abc"), "\"abc\"");
+    Assert.assertEquals(SqlUtils.quote("123\'"), "\"123\'\"");
+    Assert.assertEquals(SqlUtils.quote("abc\""), "\'abc\"\'");
   }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
   public  void testQuoteFail() {
-    PqlUtils.quote("123\"\'");
+    SqlUtils.quote("123\"\'");
   }
 
   @Test
   public void testLimit() throws Exception {
     MetricFunction metricFunction = new MetricFunction(MetricAggFunction.AVG, METRIC.getMetricName(), this.metricId, COLLECTION, null, null);
 
-    TimeSpec timeSpec = new TimeSpec(METRIC.getMetricName(), TimeGranularity.fromString("1_SECONDS"), TimeSpec.SINCE_EPOCH_FORMAT);
+    TimeSpec timeSpec = new TimeSpec("Date", TimeGranularity.fromString("1_SECONDS"), TimeSpec.SINCE_EPOCH_FORMAT);
 
     ThirdEyeRequest request = ThirdEyeRequest.newBuilder()
         .setMetricFunctions(Collections.singletonList(metricFunction))
@@ -183,16 +183,16 @@ public class PqlUtilsTest {
         .setLimit(12345)
         .build("ref");
 
-    String pql = PqlUtils.getPql(request, metricFunction, ArrayListMultimap.<String, String>create(), timeSpec);
+    String pql = SqlUtils.getSql(request, metricFunction, ArrayListMultimap.<String, String>create(), timeSpec);
 
-    Assert.assertEquals(pql, "SELECT AVG(metric) FROM collection WHERE  metric >= 1 AND metric < 2 GROUP BY dimension TOP 12345");
+    Assert.assertEquals(pql, "SELECT dimension, AVG(metric) FROM collection WHERE  Date >= 1 AND Date < 2 GROUP BY dimension LIMIT 12345");
   }
 
   @Test
   public void testLimitDefault() throws Exception {
     MetricFunction metricFunction = new MetricFunction(MetricAggFunction.AVG, METRIC.getMetricName(), this.metricId, COLLECTION, null, null);
 
-    TimeSpec timeSpec = new TimeSpec(METRIC.getMetricName(), TimeGranularity.fromString("1_SECONDS"), TimeSpec.SINCE_EPOCH_FORMAT);
+    TimeSpec timeSpec = new TimeSpec("Date", TimeGranularity.fromString("1_SECONDS"), TimeSpec.SINCE_EPOCH_FORMAT);
 
     ThirdEyeRequest request = ThirdEyeRequest.newBuilder()
         .setMetricFunctions(Collections.singletonList(metricFunction))
@@ -201,8 +201,8 @@ public class PqlUtilsTest {
         .setGroupBy("dimension")
         .build("ref");
 
-    String pql = PqlUtils.getPql(request, metricFunction, ArrayListMultimap.<String, String>create(), timeSpec);
+    String pql = SqlUtils.getSql(request, metricFunction, ArrayListMultimap.<String, String>create(), timeSpec);
 
-    Assert.assertEquals(pql, "SELECT AVG(metric) FROM collection WHERE  metric >= 1 AND metric < 2 GROUP BY dimension TOP 100000");
+    Assert.assertEquals(pql, "SELECT dimension, AVG(metric) FROM collection WHERE  Date >= 1 AND Date < 2 GROUP BY dimension LIMIT 100000");
   }
 }
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PinotDataSourceTimeQuery.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PinotDataSourceTimeQuery.java
index 845f199..b4abdb0 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PinotDataSourceTimeQuery.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PinotDataSourceTimeQuery.java
@@ -82,17 +82,17 @@ public class PinotDataSourceTimeQuery {
       TimeSpec timeSpec = ThirdEyeUtils.getTimestampTimeSpecFromDatasetConfig(datasetConfig);
 
       long cutoffTime = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1);
-      String timeClause = PqlUtils
+      String timeClause = SqlUtils
           .getBetweenClause(new DateTime(0, DateTimeZone.UTC), new DateTime(cutoffTime, DateTimeZone.UTC), timeSpec, dataset);
 
-      String maxTimePql = String.format(TIME_QUERY_TEMPLATE, functionName, timeSpec.getColumnName(), dataset, timeClause);
-      PinotQuery maxTimePinotQuery = new PinotQuery(maxTimePql, dataset);
+      String maxTimeSql = String.format(TIME_QUERY_TEMPLATE, functionName, timeSpec.getColumnName(), dataset, timeClause);
+      PinotQuery maxTimePinotQuery = new PinotQuery(maxTimeSql, dataset);
 
       ThirdEyeResultSetGroup resultSetGroup;
       final long tStart = System.nanoTime();
       try {
-        pinotThirdEyeDataSource.refreshPQL(maxTimePinotQuery);
-        resultSetGroup = pinotThirdEyeDataSource.executePQL(maxTimePinotQuery);
+        pinotThirdEyeDataSource.refreshSQL(maxTimePinotQuery);
+        resultSetGroup = pinotThirdEyeDataSource.executeSQL(maxTimePinotQuery);
         ThirdeyeMetricsUtil
             .getRequestLog().success(this.pinotThirdEyeDataSource.getName(), dataset, timeSpec.getColumnName(), tStart, System.nanoTime());
       } catch (ExecutionException e) {
@@ -101,7 +101,7 @@ public class PinotDataSourceTimeQuery {
       }
 
       if (resultSetGroup.size() == 0 || resultSetGroup.get(0).getRowCount() == 0) {
-        LOGGER.error("Failed to get latest max time for dataset {} with PQL: {}", dataset, maxTimePinotQuery.getQuery());
+        LOGGER.error("Failed to get latest max time for dataset {} with SQL: {}", dataset, maxTimePinotQuery.getQuery());
       } else {
         DateTimeZone timeZone = Utils.getDataTimeZone(dataset);
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PinotThirdEyeDataSource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PinotThirdEyeDataSource.java
index 461e2ba..167e84c 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PinotThirdEyeDataSource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PinotThirdEyeDataSource.java
@@ -168,19 +168,19 @@ public class PinotThirdEyeDataSource implements ThirdEyeDataSource {
                   datasetConfig.getPreAggregatedKeyword());
         }
 
-        String pql;
+        String sql;
         MetricConfigDTO metricConfig = metricFunction.getMetricConfig();
         if (metricConfig != null && metricConfig.isDimensionAsMetric()) {
-          pql = PqlUtils.getDimensionAsMetricPql(request, metricFunction, decoratedFilterSet, dataTimeSpec,
+          sql = SqlUtils.getDimensionAsMetricSql(request, metricFunction, decoratedFilterSet, dataTimeSpec,
               datasetConfig);
         } else {
-          pql = PqlUtils.getPql(request, metricFunction, decoratedFilterSet, dataTimeSpec);
+          sql = SqlUtils.getSql(request, metricFunction, decoratedFilterSet, dataTimeSpec);
         }
 
         ThirdEyeResultSetGroup resultSetGroup;
         final long tStartFunction = System.nanoTime();
         try {
-          resultSetGroup = this.executePQL(new PinotQuery(pql, dataset));
+          resultSetGroup = this.executeSQL(new PinotQuery(sql, dataset));
           if (metricConfig != null) {
             ThirdeyeMetricsUtil.getRequestLog()
                 .success(this.getName(), metricConfig.getDataset(), metricConfig.getName(), tStartFunction, System.nanoTime());
@@ -275,7 +275,7 @@ public class PinotThirdEyeDataSource implements ThirdEyeDataSource {
    *
    * @throws ExecutionException is thrown if failed to connect to Pinot or gets results from Pinot.
    */
-  public ThirdEyeResultSetGroup executePQL(PinotQuery pinotQuery) throws ExecutionException {
+  public ThirdEyeResultSetGroup executeSQL(PinotQuery pinotQuery) throws ExecutionException {
     Preconditions
         .checkNotNull(this.pinotResponseCache, "{} doesn't connect to Pinot or cache is not initialized.", getName());
 
@@ -295,7 +295,7 @@ public class PinotThirdEyeDataSource implements ThirdEyeDataSource {
    *
    * @throws ExecutionException is thrown if failed to connect to Pinot or gets results from Pinot.
    */
-  public ThirdEyeResultSetGroup refreshPQL(PinotQuery pinotQuery) throws ExecutionException {
+  public ThirdEyeResultSetGroup refreshSQL(PinotQuery pinotQuery) throws ExecutionException {
     Preconditions
         .checkNotNull(this.pinotResponseCache, "{} doesn't connect to Pinot or cache is not initialized.", getName());
 
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PqlUtils.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/SqlUtils.java
similarity index 92%
rename from thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PqlUtils.java
rename to thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/SqlUtils.java
index fdd06e8..a73049a 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/PqlUtils.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/SqlUtils.java
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Util class for generated PQL queries (pinot).
  */
-public class PqlUtils {
+public class SqlUtils {
   private static final Joiner AND = Joiner.on(" AND ");
   private static final Joiner COMMA = Joiner.on(", ");
 
@@ -74,7 +74,7 @@ public class PqlUtils {
   private static final String OPERATOR_GREATER_THAN = ">";
   private static final String OPERATOR_GREATER_THAN_EQUALS = ">=";
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PqlUtils.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(SqlUtils.class);
   private static final int DEFAULT_TOP = 100000;
   private static final String PERCENTILE_TDIGEST_PREFIX = "percentileTDigest";
 
@@ -85,16 +85,16 @@ public class PqlUtils {
    * Due to the summation, all metric column values can be assumed to be doubles.
    * @throws ExecutionException
    */
-  public static String getPql(ThirdEyeRequest request, MetricFunction metricFunction,
+  public static String getSql(ThirdEyeRequest request, MetricFunction metricFunction,
       Multimap<String, String> filterSet, TimeSpec dataTimeSpec) throws ExecutionException {
     // TODO handle request.getFilterClause()
 
-    return getPql(metricFunction, request.getStartTimeInclusive(), request.getEndTimeExclusive(), filterSet,
+    return getSql(metricFunction, request.getStartTimeInclusive(), request.getEndTimeExclusive(), filterSet,
         request.getGroupBy(), request.getGroupByTimeGranularity(), dataTimeSpec, request.getLimit());
   }
 
 
-  private static String getPql(MetricFunction metricFunction, DateTime startTime,
+  private static String getSql(MetricFunction metricFunction, DateTime startTime,
       DateTime endTimeExclusive, Multimap<String, String> filterSet, List<String> groupBy,
       TimeGranularity timeGranularity, TimeSpec dataTimeSpec, int limit) throws ExecutionException {
 
@@ -102,7 +102,7 @@ public class PqlUtils {
     String dataset = metricFunction.getDataset();
 
     StringBuilder sb = new StringBuilder();
-    String selectionClause = getSelectionClause(metricConfig, metricFunction);
+    String selectionClause = getSelectionClause(metricConfig, metricFunction, groupBy, timeGranularity, dataTimeSpec);
 
     sb.append("SELECT ").append(selectionClause).append(" FROM ").append(dataset);
     String betweenClause = getBetweenClause(startTime, endTimeExclusive, dataTimeSpec, dataset);
@@ -120,14 +120,23 @@ public class PqlUtils {
     String groupByClause = getDimensionGroupByClause(groupBy, timeGranularity, dataTimeSpec);
     if (StringUtils.isNotBlank(groupByClause)) {
       sb.append(" ").append(groupByClause);
-      sb.append(" TOP ").append(limit);
+      sb.append(" LIMIT ").append(limit);
     }
 
     return sb.toString();
   }
 
-  private static String getSelectionClause(MetricConfigDTO metricConfig, MetricFunction metricFunction) {
+  private static String getSelectionClause(MetricConfigDTO metricConfig, MetricFunction metricFunction,
+      List<String> groupBy, TimeGranularity aggregationGranularity, TimeSpec timeSpec) {
     StringBuilder builder = new StringBuilder();
+    if (!groupBy.isEmpty()) {
+      for (String groupByDimension : groupBy) {
+        builder.append(groupByDimension).append(", ");
+      }
+    }
+    if (aggregationGranularity != null) {
+      builder.append(getTimeColumnQueryName(aggregationGranularity, timeSpec)).append(", ");
+    }
     String metricName = null;
     if (metricFunction.getMetricName().equals("*")) {
       metricName = "*";
@@ -147,7 +156,7 @@ public class PqlUtils {
    * @return
    * @throws Exception
    */
-  public static String getDimensionAsMetricPql(ThirdEyeRequest request, MetricFunction metricFunction,
+  public static String getDimensionAsMetricSql(ThirdEyeRequest request, MetricFunction metricFunction,
       Multimap<String, String> filterSet, TimeSpec dataTimeSpec, DatasetConfigDTO datasetConfig) throws Exception {
 
     // select sum(metric_values_column) from collection
@@ -175,7 +184,7 @@ public class PqlUtils {
           + " as metricNamesColumns in " + metricNamesColumns);
     }
 
-    String dimensionAsMetricPql = getDimensionAsMetricPql(metricFunction,
+    String dimensionAsMetricPql = getDimensionAsMetricSql(metricFunction,
         request.getStartTimeInclusive(), request.getEndTimeExclusive(), filterSet,
         request.getGroupBy(), request.getGroupByTimeGranularity(), dataTimeSpec,
         metricNamesList, metricNamesColumnsList, metricValuesColumn, request.getLimit());
@@ -184,7 +193,7 @@ public class PqlUtils {
   }
 
 
-  private static String getDimensionAsMetricPql(MetricFunction metricFunction, DateTime startTime,
+  private static String getDimensionAsMetricSql(MetricFunction metricFunction, DateTime startTime,
       DateTime endTimeExclusive, Multimap<String, String> filterSet, List<String> groupBy,
       TimeGranularity timeGranularity, TimeSpec dataTimeSpec, List<String> metricNames, List<String> metricNamesColumns,
       String metricValuesColumn, int limit)
@@ -214,7 +223,7 @@ public class PqlUtils {
     String groupByClause = getDimensionGroupByClause(groupBy, timeGranularity, dataTimeSpec);
     if (StringUtils.isNotBlank(groupByClause)) {
       sb.append(" ").append(groupByClause);
-      sb.append(" TOP ").append(limit);
+      sb.append(" LIMIT ").append(limit);
     }
 
     return sb.toString();
@@ -355,18 +364,9 @@ public class PqlUtils {
 
   private static String getDimensionGroupByClause(List<String> groupBy,
       TimeGranularity aggregationGranularity, TimeSpec timeSpec) {
-    String timeColumnName = timeSpec.getColumnName();
     List<String> groups = new LinkedList<>();
-    if (aggregationGranularity != null && !groups.contains(timeColumnName)) {
-      // Convert the time column to 1 minute granularity if it is epoch.
-      // E.g., dateTimeConvert(timestampInEpoch,'1:MILLISECONDS:EPOCH','1:MILLISECONDS:EPOCH','1:MINUTES')
-      if (timeSpec.getFormat().equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())
-          && !timeSpec.getDataGranularity().equals(aggregationGranularity)) {
-        String groupByTimeColumnName = convertEpochToMinuteAggGranularity(timeColumnName, timeSpec);
-        groups.add(groupByTimeColumnName);
-      } else {
-        groups.add(timeColumnName);
-      }
+    if (aggregationGranularity != null) {
+      groups.add(getTimeColumnQueryName(aggregationGranularity, timeSpec));
     }
     if (groupBy != null) {
       groups.addAll(groupBy);
@@ -377,6 +377,19 @@ public class PqlUtils {
     return String.format("GROUP BY %s", COMMA.join(groups));
   }
 
+  private static String getTimeColumnQueryName(TimeGranularity aggregationGranularity, TimeSpec timeSpec) {
+    String timeColumnName = timeSpec.getColumnName();
+    if (aggregationGranularity != null) {
+      // Convert the time column to 1 minute granularity if it is epoch.
+      // E.g., dateTimeConvert(timestampInEpoch,'1:MILLISECONDS:EPOCH','1:MILLISECONDS:EPOCH','1:MINUTES')
+      if (timeSpec.getFormat().equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())
+          && !timeSpec.getDataGranularity().equals(aggregationGranularity)) {
+        return convertEpochToMinuteAggGranularity(timeColumnName, timeSpec);
+      }
+    }
+    return timeColumnName;
+  }
+
   public static String getDataTimeRangeSql(String dataset, String timeColumnName) {
     return String.format("select min(%s), max(%s) from %s", timeColumnName, timeColumnName,
         dataset);
@@ -387,13 +400,13 @@ public class PqlUtils {
    *
    * @param value value to be quoted
    * @return quoted value
-   * @throws IllegalArgumentException if no unused quote char can be found
+   * @throws IllegalArgumentException if no unused quote char can be foundl
    */
   static String quote(String value) {
     String quoteChar = "";
     if (!StringUtils.isNumeric(value)) {
       quoteChar = "\"";
-      if (value.contains(quoteChar)) {
+      if (StringUtils.isEmpty(value) || value.contains(quoteChar)) {
         quoteChar = "\'";
       }
       if (value.contains(quoteChar)) {
diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java
index b0d2fed..67925a9 100644
--- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java
+++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/pinot/resources/PinotDataSourceResource.java
@@ -70,7 +70,7 @@ public class PinotDataSourceResource {
     String resultString;
     PinotQuery pinotQuery = new PinotQuery(pql, tableName);
     try {
-      ThirdEyeResultSetGroup thirdEyeResultSetGroup = pinotDataSource.executePQL(pinotQuery);
+      ThirdEyeResultSetGroup thirdEyeResultSetGroup = pinotDataSource.executeSQL(pinotQuery);
       resultString = OBJECT_MAPPER.writeValueAsString(thirdEyeResultSetGroup);
     } catch (ExecutionException | JsonProcessingException e) {
       LOG.error("Failed to execute PQL ({}) due to the exception:", pinotQuery);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org