You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by la...@apache.org on 2023/10/12 12:32:36 UTC

[druid] 01/02: Rewrite EARLIEST/LATEST query operators to EARLIEST_BY/LATEST_BY (#15095)

This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch 28.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git

commit 642f79e89fc25734142ab1398af13f9ad24ba054
Author: Vishesh Garg <ga...@gmail.com>
AuthorDate: Wed Oct 11 19:48:36 2023 +0530

    Rewrite EARLIEST/LATEST query operators to EARLIEST_BY/LATEST_BY (#15095)
    
    EARLIEST and LATEST operators implicitly reference the __time column for calculation of the aggregate value. Since the reference isn't explicit, Calcite sometimes fails to update the __time column name when there's column renaming --such as in the case of nested queries -- resulting in column not found errors.
    
    This change rewrites these operators to EARLIEST_BY and LATEST_BY during query processing to make the reference explicit to Calcite.
    
    (cherry picked from commit c6ca990f1fd87d1c9e4e215a44642f9dcc9893d6)
---
 .../builtin/EarliestLatestAnySqlAggregator.java    | 100 +++++++++++++++++++--
 .../apache/druid/sql/calcite/CalciteQueryTest.java |  29 ++++++
 2 files changed, 123 insertions(+), 6 deletions(-)

diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
index abaeede9948..2e031616027 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
@@ -23,15 +23,23 @@ import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.runtime.CalciteContextException;
 import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.InferTypes;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.util.SqlVisitor;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorException;
 import org.apache.calcite.util.Optionality;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidSqlInput;
@@ -61,14 +69,21 @@ import org.apache.druid.sql.calcite.rel.InputAccessor;
 import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 public class EarliestLatestAnySqlAggregator implements SqlAggregator
 {
-  public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator(AggregatorType.EARLIEST);
-  public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator(AggregatorType.LATEST);
-  public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE);
+  public static final SqlAggregator EARLIEST = new EarliestLatestAnySqlAggregator(
+      AggregatorType.EARLIEST,
+      EarliestLatestBySqlAggregator.EARLIEST_BY.calciteFunction()
+  );
+  public static final SqlAggregator LATEST = new EarliestLatestAnySqlAggregator(
+      AggregatorType.LATEST,
+      EarliestLatestBySqlAggregator.LATEST_BY.calciteFunction()
+  );
+  public static final SqlAggregator ANY_VALUE = new EarliestLatestAnySqlAggregator(AggregatorType.ANY_VALUE, null);
 
   enum AggregatorType
   {
@@ -161,10 +176,10 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
   private final AggregatorType aggregatorType;
   private final SqlAggFunction function;
 
-  private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType)
+  private EarliestLatestAnySqlAggregator(final AggregatorType aggregatorType, final SqlAggFunction replacementAggFunc)
   {
     this.aggregatorType = aggregatorType;
-    this.function = new EarliestLatestSqlAggFunction(aggregatorType);
+    this.function = new EarliestLatestSqlAggFunction(aggregatorType, replacementAggFunc);
   }
 
   @Override
@@ -305,12 +320,48 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
     }
   }
 
+  private static class TimeColIdentifer extends SqlIdentifier
+  {
+
+    public TimeColIdentifer()
+    {
+      super("__time", SqlParserPos.ZERO);
+    }
+
+    @Override
+    public <R> R accept(SqlVisitor<R> visitor)
+    {
+
+      try {
+        return super.accept(visitor);
+      }
+      catch (CalciteContextException e) {
+        if (e.getCause() instanceof SqlValidatorException) {
+          throw DruidException.forPersona(DruidException.Persona.ADMIN)
+                              .ofCategory(DruidException.Category.INVALID_INPUT)
+                              .build(
+                                  e,
+                                  "Query could not be planned. A possible reason is [%s]",
+                                  "LATEST and EARLIEST aggregators implicitly depend on the __time column, but the "
+                                  + "table queried doesn't contain a __time column.  Please use LATEST_BY or EARLIEST_BY "
+                                  + "and specify the column explicitly."
+                              );
+
+        } else {
+          throw e;
+        }
+      }
+    }
+  }
+
   private static class EarliestLatestSqlAggFunction extends SqlAggFunction
   {
     private static final EarliestLatestReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE =
         new EarliestLatestReturnTypeInference(0);
 
-    EarliestLatestSqlAggFunction(AggregatorType aggregatorType)
+    private final SqlAggFunction replacementAggFunc;
+
+    EarliestLatestSqlAggFunction(AggregatorType aggregatorType, SqlAggFunction replacementAggFunc)
     {
       super(
           aggregatorType.name(),
@@ -331,6 +382,43 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
           false,
           Optionality.FORBIDDEN
       );
+      this.replacementAggFunc = replacementAggFunc;
+    }
+
+    @Override
+    public SqlNode rewriteCall(
+        SqlValidator validator,
+        SqlCall call
+    )
+    {
+      // Rewrite EARLIEST/LATEST to EARLIEST_BY/LATEST_BY to make
+      // reference to __time column explicit so that Calcite tracks it
+
+      if (replacementAggFunc == null) {
+        return call;
+      }
+
+      List<SqlNode> operands = call.getOperandList();
+
+      SqlParserPos pos = call.getParserPosition();
+
+      if (operands.isEmpty() || operands.size() > 2) {
+        throw InvalidSqlInput.exception(
+            "Function [%s] expects 1 or 2 arguments but found [%s]",
+            getName(),
+            operands.size()
+        );
+      }
+
+      List<SqlNode> newOperands = new ArrayList<>();
+      newOperands.add(operands.get(0));
+      newOperands.add(new TimeColIdentifer());
+
+      if (operands.size() == 2) {
+        newOperands.add(operands.get(1));
+      }
+
+      return replacementAggFunc.createCall(pos, newOperands);
     }
   }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index d49f7de9dd5..2c123d6023f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -676,6 +676,35 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
     );
   }
 
+  @Test
+  public void testLatestToLatestByConversion()
+  {
+    msqIncompatible();
+    testQuery(
+        "SELECT LATEST(dim1,10) FROM (SELECT DISTINCT __time, dim1 from foo)",
+        ImmutableList.of(
+            new GroupByQuery.Builder()
+                .setDataSource(
+                    GroupByQuery.builder()
+                                .setDataSource(CalciteTests.DATASOURCE1)
+                                .setInterval(querySegmentSpec(Filtration.eternity()))
+                                .setGranularity(Granularities.ALL)
+                                .setDimensions(dimensions(
+                                    new DefaultDimensionSpec("__time", "d0", ColumnType.LONG),
+                                    new DefaultDimensionSpec("dim1", "d1", ColumnType.STRING)
+                                ))
+                                .setContext(QUERY_CONTEXT_DEFAULT)
+                                .build())
+                .setInterval(querySegmentSpec(Filtration.eternity()))
+                .setGranularity(Granularities.ALL)
+                .setAggregatorSpecs(
+                    new StringLastAggregatorFactory("a0", "d1", "d0", 10))
+                .setContext(QUERY_CONTEXT_DEFAULT)
+                .build()),
+        ImmutableList.of(new Object[]{"abc"})
+    );
+  }
+
   @Test
   public void testLatestVectorAggregators()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org