You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "adarshsanjeev (via GitHub)" <gi...@apache.org> on 2023/02/12 14:33:23 UTC

[GitHub] [druid] adarshsanjeev opened a new pull request, #13793: Add validation for aggregations on __time

adarshsanjeev opened a new pull request, #13793:
URL: https://github.com/apache/druid/pull/13793

   Aggregations which implicitly depend on the "__time" column (such as LATEST() or EARLIEST()) when performed on an extern source in MSQ would result in the values for time defaulting to null.
   The same also occurs for non MSQ queries for the same aggregations on a lookup as the source (as __time is not present).
   
   This behaviour has been changed to throw an exception requesting the user to use LATEST_BY() or EARLIEST_BY() instead if __time is missing as this would be the more expected behaviour for these cases.
   
   <hr>
   
   This PR has:
   
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [x] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] adarshsanjeev commented on a diff in pull request #13793: Add validation for aggregations on __time

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on code in PR #13793:
URL: https://github.com/apache/druid/pull/13793#discussion_r1121188393


##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java:
##########
@@ -1363,6 +1363,39 @@ public void testGroupByArrayWithMultiValueMvToArray()
         .verifyResults();
   }
 
+  @Test
+  public void testTimeColumnAggregationFromExtern() throws IOException
+  {
+    final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
+    final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("cnt", ColumnType.LONG)
+                                            .build();
+
+    testSelectQuery()
+        .setSql("WITH\n"
+                + "kttm_data AS (\n"
+                + "SELECT * FROM TABLE(\n"
+                + "  EXTERN(\n"
+                + "    '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+                + "    '{\"type\":\"json\"}',\n"
+                + "    '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+                + "  )\n"
+                + "))\n"
+                + "\n"
+                + "SELECT\n"
+                + "  FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+                + "  LATEST(\"page\") AS \"page\"\n"
+                + "FROM kttm_data "
+                + "GROUP BY 1")
+        .setExpectedValidationErrorMatcher(
+            ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("LATEST() aggregator depends on __time column"))

Review Comment:
   As discussed validation should be fine at first



##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java:
##########
@@ -1447,12 +1447,24 @@ public void testInnerJoinQueryOfLookup(Map<String, Object> queryContext)
                 .build()
         ),
         ImmutableList.of(
-            new Object[]{"", "a", "xabc", "xabc"},
-            new Object[]{"1", "a", "xabc", "xabc"}
+            new Object[]{"", "a", "xa", "xa"},
+            new Object[]{"1", "a", "xa", "xa"}
         )
     );
   }
 
+  @Test(expected = UnsupportedSQLQueryException.class)

Review Comment:
   Would an earliest or latest make sense for a query on a lookup? There would be no implicit time column in the datasource to work with



##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java:
##########
@@ -196,6 +197,16 @@ public Aggregation toDruidAggregation(
 
     final String fieldName = getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0));
 
+    if (!rowSignature.contains(ColumnHolder.TIME_COLUMN_NAME) && (aggregatorType == AggregatorType.LATEST || aggregatorType == AggregatorType.EARLIEST)) {
+      plannerContext.setPlanningError("%s() aggregator depends on __time column, the underlying datasource "
+                                      + "or extern function you are querying doesn't contain __time column, "
+                                      + "Please use %s_BY() and specify the time column you want to use",
+                                      aggregatorType.name(),
+                                      aggregatorType.name()
+      );
+      return null;
+    }

Review Comment:
   Similar to the above comment, the user could use LATEST_BY with TIME_PARSE instead. We would want to let the user know if the implicit time column selector is actually returning 0 as it can't find the appropriate column.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers merged pull request #13793: Add validation for aggregations on __time

Posted by "paul-rogers (via GitHub)" <gi...@apache.org>.
paul-rogers merged PR #13793:
URL: https://github.com/apache/druid/pull/13793


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on pull request #13793: Add validation for aggregations on __time

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on PR #13793:
URL: https://github.com/apache/druid/pull/13793#issuecomment-1457996004

   IMO, `LATEST` shouldn't work if the relation it's reading from doesn't have a `__time` column. I'd expect a validation error if someone tries to write `SELECT … LATEST(foo) … FROM <something>` and `<something>` doesn't have `__time`.
   
   > SQL semantics say that I can use SELECT columns in an aggregate. So, it should be perfectly legal to say `LATEST_BY(foo, __time)` where `__time` has been defined as `TIME_PARSE(timestamp) AS __time`.
   
   Is this right? PostgreSQL doesn't seem to think so:
   
   ```
   create table t (x bigint);
   insert into t (x) values (1), (2), (3);
   
   select
     x + 1 as y,
     sum(y) sum_y
   from t
   group by y;
   ```
   
   yields:
   
   ```
   Query Error: error: column "y" does not exist 
   ```
   
   Our SQL validator doesn't accept it either. Something similar to the first query in Druid results in:
   
   ```
   org.apache.calcite.runtime.CalciteContextException: From line 1, column 42 to line 1, column 44: Column 'y' not found in any table
   ```
   
   And this query:
   
   ```
   select
     x + 1 as x,
     sum(x)
   from t
   group by x;
   ```
   
   yields this in both systems:
   
   ```
   | x   | sum |
   | --- | --- |
   | 4   | 3   |
   | 3   | 2   |
   | 2   | 1   |
   ```
   
   i.e., the `x` in `sum(x)` refers to `t.x`, not `x + 1 as x`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13793: Add validation for aggregations on __time

Posted by "paul-rogers (via GitHub)" <gi...@apache.org>.
paul-rogers commented on PR #13793:
URL: https://github.com/apache/druid/pull/13793#issuecomment-1427132705

   This is an interesting issue as it touches on a fundamental ambiguity in MSQ. MSQ uses a `SELECT` to perform aggregation. There are two ways to interpret such aggregations:
   
   * Do the aggregations within the `SELECT` and insert the resulting rows as detail rows into a detail datasource.
   * Do the aggregations within not just the `SELECT` but also during compaction. Insert aggregated rows into a rollup datasource.
   
   The first interpretation is standard SQL in which each `SELECT` is a black box which simply returns rows. The second interpretation is required for compaction to work. In this case, if we write the finalized `EARLIEST` into the datasource, compaction cannot then take two segments with the same time period and combine them to find the "earliest EARLIEST".
   
   As a result, we want MSQ semantics to differ from those of SQL: the contents of the `SELECT` statements "leak out" and are written to segments so that compaction can pick up where MSQ left off and continue aggregating. This is non-standard SQL, and causes us to rethink many parts of SQL handling (and to work around the fact that Calcite assumes SQL semantics.)
   
   Now to this specific case. Suppose I write my query as `LATEST_BY(foo, timestamp)` where `timestamp` is a column in the external table. This presents a variety of issues.
   
   * The `timestamp` column is probably not a `TIMESTAMP`. So, I must write `LATEST_BY(foo, TIME_PARSE(timestamp))`. This solves the first case above: the SQL is a black box.
   * SQL semantics say that I can use SELECT columns in an aggregate. So, it should be perfectly legal to say `LATEST_BY(foo, __time)` where `__time` has been defined as `TIME_PARSE(timestamp) AS __time`.
   * If we go with the `LATEST_BY(foo, TIME_PARSE(timestamp))`, then we cannot do the rollup use case: the `timestamp` column does not exist in the generated segments, and so compaction cannot further compact the columns.
   * A workaround might be to write `timestamp` into the segment so it is available for compaction. However, this will be confusing for the user: why must they save the same column twice, so that a column name from the _input_ is available in the _output_?
   
   Or, perhaps the goal is that `LATEST_BY` cannot be further aggregated (because the input column is no longer available.) If that is the goal, one could argue that the result is a bug. If we force users to use `LATEST_BY` because `LATEST` is broken, but `LATEST_BY` is also broken, we've not actually made much progress.
   
   So, what should we do? We should ensure that aggregators refer to columns from the `SELECT` clause, not the input. That is:
   
   ```sql
   INSERT INTO dst
   SELECT TIME_PARSE(timestamp) as __time, SUM(CAST(value) AS BIGINT)) AS mySum
   FROM TABLE(EXTERN(...))
     (timestamp VARCHAR, value VARCHAR
   ```
   
   Sums the `value` column after parsing. The change here suggests I should have written:
   
   ```sql
   INSERT INTO dst
   SELECT TIME_PARSE(timestamp) as __time, SUM(value) AS mySum
   FROM TABLE(EXTERN(...))
     (timestamp VARCHAR, value VARCHAR
   ```
   
   That is, I have to `SUM()` the input column: but that input column is a `VARCHAR` and not valid to sum.
   
   Let's apply this to the case at hand. What we want is to refer to the output columns:
   
   ```sql
   INSERT INTO dst
   SELECT
     TIME_PARSE(timestamp) as __time,
     LATEST_BY(CAST(value) AS BIGINT, __time) AS latestValue
   FROM TABLE(EXTERN(...))
     (timestamp VARCHAR, value VARCHAR
   ```
   
   Or, better, because `__time` is, in fact, an output column why not just use `LATEST`:
   
   ```sql
   INSERT INTO dst
   SELECT
     TIME_PARSE(timestamp) as __time,
     LATEST(CAST(value) AS BIGINT) AS latestValue
   FROM TABLE(EXTERN(...))
     (timestamp VARCHAR, value VARCHAR
   ```
   
   If we go with the rule here, then I would have to write:
   
   ```sql
   INSERT INTO dst
   SELECT
     TIME_PARSE(timestamp) as __time,
     LATEST_BY(value, timestamp) AS latestValue
   FROM TABLE(EXTERN(...))
     (timestamp VARCHAR, value VARCHAR
   ```
   
   The proposed solution goes against SQL semantics, but cannot even be made right. The types of both `value` and `timestamp` are wrong. To fix that I suppose I could use a nested select:
   
   ```sql
   INSERT INTO dst
   SELECT
     __time,
     LATEST_BY(numValue, __time) AS latestValue
   FROM
     SELECT
       TIME_PARSE(timestamp) as __time,
       CAST(value AS DOUBLE) as numValue
     FROM TABLE(EXTERN(...))
       (timestamp VARCHAR, value VARCHAR
   ```
   
   Still, however, to follow SQL, the `_time` in `LATSET_BY` would be the one from the outer `SELECT`. But, MSQ needs it to be the one from the inner `SELECT`. We will find we must rewrite Calcite (and rethink hundreds of page of SQL semantics) to make that work.
   
   In short, this bug could use a bit more design thinking before we propose a code change.
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13793: Add validation for aggregations on __time

Posted by "paul-rogers (via GitHub)" <gi...@apache.org>.
paul-rogers commented on PR #13793:
URL: https://github.com/apache/druid/pull/13793#issuecomment-1435100849

   This discussion has expanded to cover two topics: how we handle aggregations in general on MSQ, and the original, specific topic for this PR. Issue #13816 covers the broader topic.
   
   The issue here seems to be a semantic issue. MSQ requires that every expression refer to an input column. However, `LATEST(foo)` has two reference: one to an _input column_ (`foo`) and another implicit reference to the _output column_ `__time`. MSQ will have to special-case this code. Someone has to determine where the reference needs to be modified. At native query generation time in the planner? As part of the controller task?
   
   The workaround is for the planner to simply forbid the one-argument form of these functions in MSQ, forcing the user to provide an input column to use for the basis. However, if we do that, then, as noted above, that input column _is not_ available at compaction time, so we would only solve the "first pass" ingestion (MSQ) but fail the "second pass" (compaction).
   
   Would be great for someone to do the analysis, the post a description of the problem, and propose a solution that works for both ingestion and compaction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13793: Add validation for aggregations on __time

Posted by "paul-rogers (via GitHub)" <gi...@apache.org>.
paul-rogers commented on code in PR #13793:
URL: https://github.com/apache/druid/pull/13793#discussion_r1122534301


##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java:
##########
@@ -196,6 +197,16 @@ public Aggregation toDruidAggregation(
 
     final String fieldName = getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0));
 
+    if (!rowSignature.contains(ColumnHolder.TIME_COLUMN_NAME) && (aggregatorType == AggregatorType.LATEST || aggregatorType == AggregatorType.EARLIEST)) {
+      plannerContext.setPlanningError("%s() aggregator depends on __time column, the underlying datasource "
+                                      + "or extern function you are querying doesn't contain __time column, "
+                                      + "Please use %s_BY() and specify the time column you want to use",
+                                      aggregatorType.name(),
+                                      aggregatorType.name()
+      );
+      return null;
+    }

Review Comment:
   I get that the user *could* use the alternative. The goal here, _should be_ that the user can use the actual features which Druid claims it supports.
   
   That is, I'm asserting that the goal of this PR should not be not to accept broken behavior, and tell the user not to use the broken stuff, but to fix the behavior so what should work _does_ work.
   
   In short, we don't want to wrap a bug with nicer error messages. We want to fix the bug. (IMHO.)



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java:
##########
@@ -1363,6 +1363,39 @@ public void testGroupByArrayWithMultiValueMvToArray()
         .verifyResults();
   }
 
+  @Test
+  public void testTimeColumnAggregationFromExtern() throws IOException
+  {
+    final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
+    final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("cnt", ColumnType.LONG)
+                                            .build();
+
+    testSelectQuery()
+        .setSql("WITH\n"
+                + "kttm_data AS (\n"
+                + "SELECT * FROM TABLE(\n"
+                + "  EXTERN(\n"
+                + "    '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+                + "    '{\"type\":\"json\"}',\n"
+                + "    '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+                + "  )\n"
+                + "))\n"
+                + "\n"
+                + "SELECT\n"
+                + "  FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+                + "  LATEST(\"page\") AS \"page\"\n"
+                + "FROM kttm_data "
+                + "GROUP BY 1")
+        .setExpectedValidationErrorMatcher(
+            ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("LATEST() aggregator depends on __time column"))

Review Comment:
   Sorry, I didn't follow. Maybe my note was too terse?
   
   The query here should be legal. `page` is a `VARCHAR` (Druid `string`) column. Segments allow `LATEST(string)` (though not `LATEST` of numeric types.) MSQ is the modern way to create rollup tables.
   
   Now, this query might fail because we haven't passed the context parameter to ask for a rolled up datasource. But, in that case, aggregations are applied first, and the results written to the datasource as a detail table. In that case, the above query should also work.
   
   The test, however, expects the query to fail. My comment is noting that this is not what should happen. (I get that it is what this PR _wants_ to happen.) What should happen is that this is legal, and we emit segments based on the `finalzieAggregations` context parameter.
   
   So, the ask is simple: let's find out what it take so that this query passes and (with the right context setting), emits an intermediate `COMPLEX<Pair<string, long>>` value into the output segments.



##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java:
##########
@@ -1447,12 +1447,24 @@ public void testInnerJoinQueryOfLookup(Map<String, Object> queryContext)
                 .build()
         ),
         ImmutableList.of(
-            new Object[]{"", "a", "xabc", "xabc"},
-            new Object[]{"1", "a", "xabc", "xabc"}
+            new Object[]{"", "a", "xa", "xa"},
+            new Object[]{"1", "a", "xa", "xa"}
         )
     );
   }
 
+  @Test(expected = UnsupportedSQLQueryException.class)

Review Comment:
   Absolutely. Every datasource row has a `__time` column.
   
   Here's another way to express this for the `SELECT` case. If the query works for the interactive SQL engine, it should work for the MSQ query engine. It is not a benefit to the user that `LATEST(x)` works on one engine but not the other. It is, instead, a bug to be fixed.
   
   It certainly true that it might be hard to fix given how MSQ does things for some reason. That just means fixing it might be hard.
   
   If, however, this query does not work on the interactive SQL engine, then we've got larger issues which we may not be able to fix in this PR. One issue is, "what does it mean to offer `LATEST(x)` if it doesn't actually work?"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] adarshsanjeev commented on pull request #13793: Add validation for aggregations on __time

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on PR #13793:
URL: https://github.com/apache/druid/pull/13793#issuecomment-1427812967

   Thanks for taking the time to reply! I had not considered that this might affect compaction while making the changes, a different fix might be needed here instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on a diff in pull request #13793: Add validation for aggregations on __time

Posted by "paul-rogers (via GitHub)" <gi...@apache.org>.
paul-rogers commented on code in PR #13793:
URL: https://github.com/apache/druid/pull/13793#discussion_r1114922369


##########
docs/querying/sql-aggregations.md:
##########
@@ -86,7 +86,7 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and
 |`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
 |`EARLIEST_BY(expr, timestampExpr)`|Returns the earliest value of `expr`, which must be numeric. The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`. If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`EARLIEST_BY(expr, timestampExpr, maxBytesPerString)`| Like `EARLIEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit are truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
-|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "latest" is taken from the row with the overall latest non-null value of the timestamp column. If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. The `expr` must come from a relation with a timestamp column (like `__time` in a Druid datasource) and the "latest" is taken from the row with the overall latest non-null value of the timestamp column. If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. |`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|

Review Comment:
   This description is a bit off. To correct it, we just understand how this aggregation works (or is supposed to work). This aggregation applies _only_ to a Druid datasource. Why? Because we need the `__time` column, and that column only exists in Druid datasources (where it is guaranteed to exist.)
   
   When using this function in _queries_, the function implicitly references that datasource `__time` column. If the aggregation is used on a datasource without a `__time` column, it should fail. Although all Druid datasources have `__time` a Druid data source (with a space) need not have such a column. Example: the output of a subquery. In this case, the function cannot be used: the explicit form is required.
   
   For _ingestion_ (via MSQ) the story is more complex. We have to consider whether we're creating a detail table or a rollup table. If we're creating a detail table (that is, no aggregation), then the same rules for queries apply.
   
   But, if we are building a table _with_ rollup, we must apply slightly different rules. In the case of rollup, the `__time` column is the one in the _target_ datasource, not the source input source. (Sorry for Druid's awkward naming.)
   
   This is true because aggregation for rollup tables is best thought of as a property of the table (datasource), not the query. Why? I can do two ingestions into two segments for the same time chunk. I can then use compaction to combine them. Compaction references only data within the datasource: it has no ability to reference the (now long gone) original input sources.
   
   The unifying rule is that `LATEST(x)` implicitly refers to the `__time` column in the same record as the value of `x` appears, _after_ the record has been projected into the target datasource format on ingestion, and _before_ projecting out of the datasource format on query.
   
   On the other hand, `LATEST(x, y)` is a bit simpler: it doesn't really matter where the `y` (timestamp) value comes from: it will be written into the intermediate value anyway. To make this concrete:
   
   ```sql
   SELECT
     TIME_PARSE(a) AS __time,
     TIME_PARSE(b) AS anotherTime,
     LATEST(c, anotherTime)
   ```
   
   Would be nice, but Druid doesn't one SELECT expression to reference another, so we have to write:
   
   ```sql
   SELECT
     TIME_PARSE(a) AS __time,
     TIME_PARSE(b) AS anotherTime,
     LATEST(c, TIME_PARSE(b))
   ```
   



##########
sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java:
##########
@@ -196,6 +197,16 @@ public Aggregation toDruidAggregation(
 
     final String fieldName = getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0));
 
+    if (!rowSignature.contains(ColumnHolder.TIME_COLUMN_NAME) && (aggregatorType == AggregatorType.LATEST || aggregatorType == AggregatorType.EARLIEST)) {
+      plannerContext.setPlanningError("%s() aggregator depends on __time column, the underlying datasource "
+                                      + "or extern function you are querying doesn't contain __time column, "
+                                      + "Please use %s_BY() and specify the time column you want to use",
+                                      aggregatorType.name(),
+                                      aggregatorType.name()
+      );
+      return null;
+    }

Review Comment:
   I'm afraid this is the wrong check: we're asking a CSV file to provide a `LONG` `__time` column. Probably not going to happen: we can't ask the user to rewrite their input files to make MSQ happy.
   
   What we want to know is: is there a `__time` column in the projected output row? That is tricky, but it's what we need.



##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java:
##########
@@ -1447,12 +1447,24 @@ public void testInnerJoinQueryOfLookup(Map<String, Object> queryContext)
                 .build()
         ),
         ImmutableList.of(
-            new Object[]{"", "a", "xabc", "xabc"},
-            new Object[]{"1", "a", "xabc", "xabc"}
+            new Object[]{"", "a", "xa", "xa"},
+            new Object[]{"1", "a", "xa", "xa"}
         )
     );
   }
 
+  @Test(expected = UnsupportedSQLQueryException.class)

Review Comment:
   We want this to work.



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java:
##########
@@ -1363,6 +1363,39 @@ public void testGroupByArrayWithMultiValueMvToArray()
         .verifyResults();
   }
 
+  @Test
+  public void testTimeColumnAggregationFromExtern() throws IOException
+  {
+    final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json");
+    final String toReadAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("cnt", ColumnType.LONG)
+                                            .build();
+
+    testSelectQuery()
+        .setSql("WITH\n"
+                + "kttm_data AS (\n"
+                + "SELECT * FROM TABLE(\n"
+                + "  EXTERN(\n"
+                + "    '{ \"files\": [" + toReadAsJson + "],\"type\":\"local\"}',\n"
+                + "    '{\"type\":\"json\"}',\n"
+                + "    '[{\"name\": \"timestamp\", \"type\": \"string\"}, {\"name\": \"page\", \"type\": \"string\"}, {\"name\": \"user\", \"type\": \"string\"}]'\n"
+                + "  )\n"
+                + "))\n"
+                + "\n"
+                + "SELECT\n"
+                + "  FLOOR(TIME_PARSE(\"timestamp\") TO MINUTE) AS __time,\n"
+                + "  LATEST(\"page\") AS \"page\"\n"
+                + "FROM kttm_data "
+                + "GROUP BY 1")
+        .setExpectedValidationErrorMatcher(
+            ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("LATEST() aggregator depends on __time column"))

Review Comment:
   We need this to succeed, not fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] adarshsanjeev commented on pull request #13793: Add validation for aggregations on __time

Posted by "adarshsanjeev (via GitHub)" <gi...@apache.org>.
adarshsanjeev commented on PR #13793:
URL: https://github.com/apache/druid/pull/13793#issuecomment-1449386515

   After some discussion with Paul about the correct approach, we determined that during compaction time rollup, the LATEST_BY aggregator should fetch the correct value from the object value. Hence, LATEST BY should be okay as a workaround for LATEST, as that still does not function correctly.
   
   Since there are no large issues blocking this PR, I will work on addressing any minor comments and getting this merged since currently it could produce incorrect result without this validation..
   
    I will also try to resolve the issues with LATEST by updating the reference to time column, but this might take a little more effort to come up with the best approach. I will track any updates to this in the github issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13793: Add validation for aggregations on __time

Posted by "paul-rogers (via GitHub)" <gi...@apache.org>.
paul-rogers commented on PR #13793:
URL: https://github.com/apache/druid/pull/13793#issuecomment-1429032881

   This PR is of interest because it impacts the same area as the catalog project. Just FYI, here is some background.
   
   With the catalog, we want to validate the type of each column. Quick, what is the type of column X here:
   
   ```sql
   INSERT INTO dst
   SELECT LATEST(A) AS X FROM TABLE(...) (A VARCHAR)
   ```
   
   As explained above, if we just look at the `SELECT`, then `X` is `VARCHAR`. (That is, the aggregation is finalized.) But, we want to insert into `dst`. In the catalog schema, what type should we declare `X` to be? It is certainly _not_ `VARCHAR` because, as noted above, we could not further aggregate. So, what is it?
   
   The `StringLastAggregatorFactory` tells us that the intermediate type is `COMPLEX<serializablePairLongString>`. If we knew that in the SQL layer, we could reject the following. Notice the change of type of `A` to `BIGINT`:
   
   ```sql
   INSERT INTO dst
   SELECT LATEST(A) AS X FROM TABLE(...) (A BIGINT)
   ```
   
   Unfortunately, the COMPLEX<serializablePairLongString>` type is a bit too general, since that is the same type for `EARLIEST`. However, we would like to reject the following in the case where `X` wants to be `LATEST` to prevent the catastrophic combining of the two aggregators on the same column in different ingestions:
   
   ```sql
   INSERT INTO dst
   SELECT EARLIEST(A) AS X FROM TABLE(...) (A VARCHAR)
   ```
   
   So, we need an internal type of something like `COMPLEX<latest-string>`. Unfortunately, there is no such type in Druid: the information about the aggregate function is lost. So we need to do something or other to fix this.
   
   The challenge for this PR is that SQL knows nothing about the internal types (that I can find). Our SQL aggregators report only the finalized type (`VARCHAR` in this example.) As a result, you're trying to fix a bug without really having the internal structure needed. Let's hope a simple fix can be found that solves this one PR. The actual issue is to reference columns in the output (per SQL) and not the input.
   
   Or, perhaps, we have to resolve the larger issue. For MSQ, we have to know if the `LATEST(VARCHAR)` function returns:
   
   * `VARCHAR` if MSQ is used to query, or if the user is doing the "aggregate in the `SELECT`, store the result as detail" case.
   * `COMPLEX<latest-string>` if MSQ is used to ingest, and the `SELECT` is giving the aggregation to use both on ingestion and compaction. The SQL type COMPLEX<latest-string>` maps to the Druid type `COMPLEX<serializablePairLongString>` at the storage level.
   
   The Druid Calcite-based planner assumes the first case. (The type of `LATEST(VARCHAR)` is `VARCHAR`). But, to make `LATEST` work in compaction, we need the second form of the type, which we don't have at present.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] paul-rogers commented on pull request #13793: Add validation for aggregations on __time

Posted by "paul-rogers (via GitHub)" <gi...@apache.org>.
paul-rogers commented on PR #13793:
URL: https://github.com/apache/druid/pull/13793#issuecomment-1459114229

   After some offline discussions, it was felt that fixing this issue (by allowing `LATEST(x)` with MSQ) is unnecessary and overly complex. Instead, the original goal is wanted instead (disallow `LATEST(x)` in MSQ statements that do not finalize results.) Adding `LATEST(x)` support can be seen as a feature request for another time.
   
   The simplest solution is to disallow `LATEST(.)` if the engine is MSQ. There are finer points, but we can use the crude-but-effective solution for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org