You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "paul-rogers (via GitHub)" <gi...@apache.org> on 2023/02/22 20:33:50 UTC

[GitHub] [druid] paul-rogers commented on a diff in pull request #13793: Add validation for aggregations on __time

paul-rogers commented on code in PR #13793:
URL: https://github.com/apache/druid/pull/13793#discussion_r1114922369


##########
docs/querying/sql-aggregations.md:
##########
@@ -86,7 +86,7 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and
 |`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
 |`EARLIEST_BY(expr, timestampExpr)`|Returns the earliest value of `expr`, which must be numeric. The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`. If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`EARLIEST_BY(expr, timestampExpr, maxBytesPerString)`| Like `EARLIEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
-|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "latest" is taken from the row with the overall latest non-null value of the timestamp column. If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. The `expr` must come from a relation with a timestamp column (like `__time` in a Druid datasource) and the "latest" is taken from the row with the overall latest non-null value of the timestamp column. If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. |`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|

Review Comment:
   This description is a bit off. To correct it, we just understand how this aggregation works (or is supposed to work). This aggregation applies _only_ to a Druid datasource. Why? Because we need the `__time` column, and that column only exists in Druid datasources (where it is guaranteed to exist.)
   
   When using this function in _queries_, the function implicitly references that datasource `__time` column. If the aggregation is used on a datasource without a `__time` column, it should fail. Although all Druid datasources have `__time` a Druid data source (with a space) need not have such a column. Example: the output of a subquery. In this case, the function cannot be used: the explicit form is required.
   
   For _ingestion_ (via MSQ) the story is more complex. We have to consider whether we're creating a detail table or a rollup table. If we're creating a detail table (that is, no aggregation), then the same rules for queries apply.
   
   But, if we are building a table _with_ rollup, we must apply slightly different rules. In the case of rollup, the `__time` column is the one in the _target_ datasource, not the source input source. (Sorry for Druid's awkward naming.)
   
   This is true because aggregation for rollup tables is best thought of as a property of the table (datasource), not the query. Why? I can do two ingestions into two segments for the same time chunk. I can then use compaction to combine them. Compaction references only data within the datasource: it has no ability to reference the (now long gone) original input sources.
   
   The unifying rule is that `LATEST(x)` implicitly refers to the `__time` column in the same record as the value of `x` appears, _after_ the record has been projected into the target datasource format on ingestion, and _before_ projecting out of the datasource format on query.
   
   On the other hand, `LATEST(x, y)` is a bit simpler: it doesn't really matter where the `y` (timestamp) value comes from: it will be written into the intermediate value anyway. To make this concrete:
   
   ```sql
   SELECT
     TIME_PARSE(a) AS __time,
     TIME_PARSE(b) AS anotherTime,
     LATEST(c, anotherTime)
   ```
   
   Would be nice, but Druid doesn't one SELECT expression to reference another, so we have to write:
   
   ```sql
   SELECT
     TIME_PARSE(a) AS __time,
     TIME_PARSE(b) AS anotherTime,
     LATEST(c, TIME_PARSE(b))
   ```
   



##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java:
##########
@@ -196,6 +197,16 @@ public Aggregation toDruidAggregation(
 
     final String fieldName = getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0));
 
+    if (!rowSignature.contains(ColumnHolder.TIME_COLUMN_NAME) && (aggregatorType == AggregatorType.LATEST || aggregatorType == AggregatorType.EARLIEST)) {
+      plannerContext.setPlanningError("%s() aggregator depends on __time column, the underlying datasource "
+                                      + "or extern function you are querying doesn't contain __time column, "
+                                      + "Please use %s_BY() and specify the time column you want to use",
+                                      aggregatorType.name(),
+                                      aggregatorType.name()
+      );
+      return null;
+    }

Review Comment:
   I'm afraid this is the wrong check: we're asking a CSV file to provide a `LONG` `__time` column. Probably not going to happen: we can't ask the user to rewrite their input files to make MSQ happy.
   
   What we want to know is: is there a `__time` column in the projected output row? That is tricky, but it's what we need.



##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java:
##########
@@ -1447,12 +1447,24 @@ public void testInnerJoinQueryOfLookup(Map<String, Object> queryContext)
                 .build()
         ),
         ImmutableList.of(
-            new Object[]{"", "a", "xabc", "xabc"},
-            new Object[]{"1", "a", "xabc", "xabc"}
+            new Object[]{"", "a", "xa", "xa"},
+            new Object[]{"1", "a", "xa", "xa"}
         )
     );
   }
 
+  @Test(expected = UnsupportedSQLQueryException.class)

Review Comment:
   We want this to work.



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java:
##########
@@ -1363,6 +1363,39 @@ public void testGroupByArrayWithMultiValueMvToArray()
         .verifyResults();
   }
 
+  @Test
+  public void testTimeColumnAggregationFromExtern() throws IOException
+  {
+    final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
+    final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("cnt", ColumnType.LONG)
+                                            .build();
+
+    testSelectQuery()
+        .setSql("WITH\n"
+                + "kttm_data AS (\n"
+                + "SELECT * FROM TABLE(\n"
+                + "  EXTERN(\n"
+                + "    '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+                + "    '{\"type\":\"json\"}',\n"
+                + "    '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+                + "  )\n"
+                + "))\n"
+                + "\n"
+                + "SELECT\n"
+                + "  FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+                + "  LATEST(\"page\") AS \"page\"\n"
+                + "FROM kttm_data "
+                + "GROUP BY 1")
+        .setExpectedValidationErrorMatcher(
+            ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("LATEST() aggregator depends on __time column"))

Review Comment:
   We need this to succeed, not fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org