You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by pr...@apache.org on 2023/03/08 01:16:44 UTC

[druid] branch master updated: Add validation for aggregations on __time (#13793)

This is an automated email from the ASF dual-hosted git repository.

progers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ef82756176 Add validation for aggregations on __time (#13793)
ef82756176 is described below

commit ef827561762f4c39adca51c1a1f3e39cbcde8a8f
Author: Adarsh Sanjeev <ad...@gmail.com>
AuthorDate: Wed Mar 8 06:46:36 2023 +0530

    Add validation for aggregations on __time (#13793)
    
    * Add validation for aggregations on __time
---
 docs/querying/sql-aggregations.md                  |  2 +-
 .../org/apache/druid/msq/exec/MSQSelectTest.java   | 33 ++++++++++++++++++++++
 .../builtin/EarliestLatestAnySqlAggregator.java    | 11 ++++++++
 .../druid/sql/calcite/CalciteJoinQueryTest.java    | 22 +++++++++++----
 4 files changed, 62 insertions(+), 6 deletions(-)

diff --git a/docs/querying/sql-aggregations.md b/docs/querying/sql-aggregations.md
index cf43682971..ec266dab9c 100644
--- a/docs/querying/sql-aggregations.md
+++ b/docs/querying/sql-aggregations.md
@@ -86,7 +86,7 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and
 |`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
 |`EARLIEST_BY(expr, timestampExpr)`|Returns the earliest value of `expr`, which must be numeric. The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`. If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`EARLIEST_BY(expr, timestampExpr, maxBytesPerString)`| Like `EARLIEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
-|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "latest" is taken from the row with the overall latest non-null value of the timestamp column. If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the last value encou [...]
+|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. The `expr` must come from a relation with a timestamp column (like `__time` in a Druid datasource) and the "latest" is taken from the row with the overall latest non-null value of the timestamp column. If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. |`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
 |`LATEST_BY(expr, timestampExpr)`|Returns the latest value of `expr`, which must be numeric. The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`. If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`LATEST_BY(expr, timestampExpr, maxBytesPerString)`|Like `LATEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index b871fc4a6e..548235b46b 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -1363,6 +1363,39 @@ public class MSQSelectTest extends MSQTestBase
         .verifyResults();
   }
 
+  @Test
+  public void testTimeColumnAggregationFromExtern() throws IOException
+  {
+    final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
+    final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("cnt", ColumnType.LONG)
+                                            .build();
+
+    testSelectQuery()
+        .setSql("WITH\n"
+                + "kttm_data AS (\n"
+                + "SELECT * FROM TABLE(\n"
+                + "  EXTERN(\n"
+                + "    '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+                + "    '{\"type\":\"json\"}',\n"
+                + "    '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+                + "  )\n"
+                + "))\n"
+                + "\n"
+                + "SELECT\n"
+                + "  FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+                + "  LATEST(\"page\") AS \"page\"\n"
+                + "FROM kttm_data "
+                + "GROUP BY 1")
+        .setExpectedValidationErrorMatcher(
+            ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("LATEST() aggregator depends on __time column"))
+        )
+        .setExpectedRowSignature(rowSignature)
+        .verifyPlanningErrors();
+  }
 
   @Test
   public void testGroupByWithMultiValueMvToArrayWithoutGroupByEnable()
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
index ba06148363..f344cedb16 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
@@ -51,6 +51,7 @@ import org.apache.druid.query.aggregation.last.FloatLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.LongLastAggregatorFactory;
 import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
 import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
+import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.sql.calcite.aggregation.Aggregation;
@@ -196,6 +197,16 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
 
     final String fieldName = getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0));
 
+    if (!rowSignature.contains(ColumnHolder.TIME_COLUMN_NAME) && (aggregatorType == AggregatorType.LATEST || aggregatorType == AggregatorType.EARLIEST)) {
+      plannerContext.setPlanningError("%s() aggregator depends on __time column, the underlying datasource "
+                                      + "or extern function you are querying doesn't contain __time column, "
+                                      + "Please use %s_BY() and specify the time column you want to use",
+                                      aggregatorType.name(),
+                                      aggregatorType.name()
+      );
+      return null;
+    }
+
     final AggregatorFactory theAggFactory;
     switch (args.size()) {
       case 1:
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
index b18a564324..d41d56f54c 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
@@ -54,7 +54,6 @@ import org.apache.druid.query.aggregation.LongMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
 import org.apache.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
-import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
 import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
 import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
 import org.apache.druid.query.dimension.DefaultDimensionSpec;
@@ -86,6 +85,7 @@ import org.apache.druid.server.security.Access;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.planner.UnsupportedSQLQueryException;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
@@ -1412,7 +1412,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
         "SELECT dim1, dim2, t1.v, t1.v\n"
             + "FROM foo\n"
             + "INNER JOIN \n"
-            + "  (SELECT SUBSTRING(k, 1, 1) k, LATEST(v, 10) v FROM lookup.lookyloo GROUP BY 1) t1\n"
+            + "  (SELECT SUBSTRING(k, 1, 1) k, ANY_VALUE(v, 10) v FROM lookup.lookyloo GROUP BY 1) t1\n"
             + "  ON foo.dim2 = t1.k",
         queryContext,
         ImmutableList.of(
@@ -1433,7 +1433,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
                                         new SubstringDimExtractionFn(0, 1)
                                     )
                                 )
-                                .setAggregatorSpecs(new StringLastAggregatorFactory("a0", "v", null, 10))
+                                .setAggregatorSpecs(new StringAnyAggregatorFactory("a0", "v", 10))
                                 .build()
                         ),
                         "j0.",
@@ -1447,12 +1447,24 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
                 .build()
         ),
         ImmutableList.of(
-            new Object[]{"", "a", "xabc", "xabc"},
-            new Object[]{"1", "a", "xabc", "xabc"}
+            new Object[]{"", "a", "xa", "xa"},
+            new Object[]{"1", "a", "xa", "xa"}
         )
     );
   }
 
+  @Test(expected = UnsupportedSQLQueryException.class)
+  @Parameters(source = QueryContextForJoinProvider.class)
+  public void testTimeColumnAggregationsOnLookups(Map<String, Object> queryContext)
+  {
+    testQuery(
+        "SELECT k, LATEST(v) v FROM lookup.lookyloo GROUP BY k",
+        queryContext,
+        ImmutableList.of(),
+        ImmutableList.of()
+    );
+  }
+
   @Test
   @Parameters(source = QueryContextForJoinProvider.class)
   public void testInnerJoinQueryOfLookupRemovable(Map<String, Object> queryContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org