You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by la...@apache.org on 2023/10/12 12:32:36 UTC
[druid] 01/02: Rewrite EARLIEST/LATEST query operators to EARLIEST_BY/LATEST_BY (#15095)
This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch 28.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
commit 642f79e89fc25734142ab1398af13f9ad24ba054
Author: Vishesh Garg <ga...@gmail.com>
AuthorDate: Wed Oct 11 19:48:36 2023 +0530
Rewrite EARLIEST/LATEST query operators to EARLIEST_BY/LATEST_BY (#15095)
EARLIEST and LATEST operators implicitly reference the __time column for calculation of the aggregate value. Since the reference isn't explicit, Calcite sometimes fails to update the __time column name when there's column renaming --such as in the case of nested queries -- resulting in column not found errors.
This change rewrites these operators to EARLIEST_BY and LATEST_BY during query processing to make the reference explicit to Calcite.
(cherry picked from commit c6ca990f1fd87d1c9e4e215a44642f9dcc9893d6)
---
.../builtin/EarliestLatestAnySqlAggregator.java | 100 +++++++++++++++++++--
.../apache/druid/sql/calcite/CalciteQueryTest.java | 29 ++++++
2 files changed, 123 insertions(+), 6 deletions(-)
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
index abaeede9948..2e031616027 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
@@ -23,15 +23,23 @@ import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.runtime.CalciteContextException;
import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.util.SqlVisitor;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.util.Optionality;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
@@ -61,14 +69,21 @@ import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class EarliestLatestAnySqlAggregator implements SqlAggregator
{
- public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator(AggregatorType.EARLIEST);
- public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator(AggregatorType.LATEST);
- public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE);
+ public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator(
+ AggregatorType.EARLIEST,
+ EarliestLatestBySqlAggregator.EARLIEST_BY.calciteFunction()
+ );
+ public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator(
+ AggregatorType.LATEST,
+ EarliestLatestBySqlAggregator.LATEST_BY.calciteFunction()
+ );
+ public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE, null);
enum AggregatorType
{
@@ -161,10 +176,10 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
private final AggregatorType aggregatorType;
private final SqlAggFunction function;
- private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType)
+ private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType, final SqlAggFunction replacementAggFunc)
{
this.aggregatorType = aggregatorType;
- this.function = new EarliestLatestSqlAggFunction(aggregatorType);
+ this.function = new EarliestLatestSqlAggFunction(aggregatorType, replacementAggFunc);
}
@Override
@@ -305,12 +320,48 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
}
}
+ private static class TimeColIdentifer extends SqlIdentifier
+ {
+
+ public TimeColIdentifer()
+ {
+ super("__time", SqlParserPos.ZERO);
+ }
+
+ @Override
+ public <R> R accept(SqlVisitor<R> visitor)
+ {
+
+ try {
+ return super.accept(visitor);
+ }
+ catch (CalciteContextException e) {
+ if (e.getCause() instanceof SqlValidatorException) {
+ throw DruidException.forPersona(DruidException.Persona.ADMIN)
+ .ofCategory(DruidException.Category.INVALID_INPUT)
+ .build(
+ e,
+ "Query could not be planned. A possible reason is [%s]",
+ "LATEST and EARLIEST aggregators implicitly depend on the __time column, but the "
+ + "table queried doesn't contain a __time column. Please use LATEST_BY or EARLIEST_BY "
+ + "and specify the column explicitly."
+ );
+
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+
private static class EarliestLatestSqlAggFunction extends SqlAggFunction
{
private static final EarliestLatestReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE =
new EarliestLatestReturnTypeInference(0);
- EarliestLatestSqlAggFunction(AggregatorType aggregatorType)
+ private final SqlAggFunction replacementAggFunc;
+
+ EarliestLatestSqlAggFunction(AggregatorType aggregatorType, SqlAggFunction replacementAggFunc)
{
super(
aggregatorType.name(),
@@ -331,6 +382,43 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
false,
Optionality.FORBIDDEN
);
+ this.replacementAggFunc = replacementAggFunc;
+ }
+
+ @Override
+ public SqlNode rewriteCall(
+ SqlValidator validator,
+ SqlCall call
+ )
+ {
+ // Rewrite EARLIEST/LATEST to EARLIEST_BY/LATEST_BY to make
+ // reference to __time column explicit so that Calcite tracks it
+
+ if (replacementAggFunc == null) {
+ return call;
+ }
+
+ List<SqlNode> operands = call.getOperandList();
+
+ SqlParserPos pos = call.getParserPosition();
+
+ if (operands.isEmpty() || operands.size() > 2) {
+ throw InvalidSqlInput.exception(
+ "Function [%s] expects 1 or 2 arguments but found [%s]",
+ getName(),
+ operands.size()
+ );
+ }
+
+ List<SqlNode> newOperands = new ArrayList<>();
+ newOperands.add(operands.get(0));
+ newOperands.add(new TimeColIdentifer());
+
+ if (operands.size() == 2) {
+ newOperands.add(operands.get(1));
+ }
+
+ return replacementAggFunc.createCall(pos, newOperands);
}
}
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index d49f7de9dd5..2c123d6023f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -676,6 +676,35 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
+ @Test
+ public void testLatestToLatestByConversion()
+ {
+ msqIncompatible();
+ testQuery(
+ "SELECT LATEST(dim1,10) FROM (SELECT DISTINCT __time, dim1 from foo)",
+ ImmutableList.of(
+ new GroupByQuery.Builder()
+ .setDataSource(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(dimensions(
+ new DefaultDimensionSpec("__time", "d0", ColumnType.LONG),
+ new DefaultDimensionSpec("dim1", "d1", ColumnType.STRING)
+ ))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build())
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setAggregatorSpecs(
+ new StringLastAggregatorFactory("a0", "d1", "d0", 10))
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()),
+ ImmutableList.of(new Object[]{"abc"})
+ );
+ }
+
@Test
public void testLatestVectorAggregators()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org