You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2022/01/12 11:49:23 UTC

[druid] branch master updated: add EARLIEST_BY/LATEST_BY to make EARLIEST/LATEST function signatures less ambiguous (#12145)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f2ce769  add EARLIEST_BY/LATEST_BY to make EARLIEST/LATEST function signatures less ambiguous (#12145)
f2ce769 is described below

commit f2ce76966cf06134dbe712782edf46a7c8d72563
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Wed Jan 12 03:48:53 2022 -0800

    add EARLIEST_BY/LATEST_BY to make EARLIEST/LATEST function signatures less ambiguous (#12145)
    
    * add EARLIEST_BY/LATEST_BY to make EARLIEST/LATEST function signatures unambiguous
    
    * switcheroo
    
    * EARLIEST_BY/LATEST_BY use timestamp instead of numeric types, update docs
    
    * revert unintended change
    
    * fix docs
    
    * fix docs better
---
 docs/querying/sql.md                               |  12 +-
 .../aggregation/first/NumericFirstAggregator.java  |   3 +
 .../first/NumericFirstBufferAggregator.java        |   3 +
 .../aggregation/first/StringFirstAggregator.java   |   3 +
 .../first/StringFirstBufferAggregator.java         |   3 +
 .../aggregation/last/NumericLastAggregator.java    |   3 +
 .../last/NumericLastBufferAggregator.java          |   3 +
 .../aggregation/last/StringLastAggregator.java     |   3 +
 .../last/StringLastBufferAggregator.java           |   3 +
 .../builtin/EarliestLatestAnySqlAggregator.java    |  41 +----
 .../builtin/EarliestLatestBySqlAggregator.java     | 181 +++++++++++++++++++++
 .../sql/calcite/planner/DruidOperatorTable.java    |   3 +
 .../apache/druid/sql/calcite/CalciteQueryTest.java |  70 +++++---
 13 files changed, 268 insertions(+), 63 deletions(-)

diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index f0e769a..e227757 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -364,14 +364,14 @@ In the aggregation functions supported by Druid, only `COUNT`, `ARRAY_AGG`, and
 |`STDDEV_POP(expr)`|Computes standard deviation population of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
-|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
-|`EARLIEST(expr, timeColumn)`|Returns the earliest value of `expr`, which must be numeric. Earliest value is defined as the value first encountered with the minimum overall value of time column of all values being aggregated.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "earliest" is taken from the row with the overall earliest non-null value of the timestamp column. If the earliest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the first  [...]
 |`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
-|`EARLIEST(expr, maxBytesPerString, timeColumn)`|Like `EARLIEST(expr, timeColumn)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
-|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
-|`LATEST(expr, timeColumn)`|Returns the latest value of `expr`, which must be numeric. Latest value is defined as the value last encountered with the maximum overall value of time column of all values being aggregated.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`EARLIEST_BY(expr, timestampExpr)`|Returns the earliest value of `expr`, which must be numeric. The earliest value of `expr` is taken from the row with the overall earliest non-null value of `timestampExpr`. If the earliest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`EARLIEST_BY(expr, timestampExpr, maxBytesPerString)`| Like `EARLIEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
+|`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like `__time` in a Druid datasource), the "latest" is taken from the row with the overall latest non-null value of the timestamp column. If the latest non-null value of the timestamp column appears in multiple rows, the `expr` may be taken from any of those rows. If `expr` does not come from a relation with a timestamp, then it is simply the last value encou [...]
 |`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
-|`LATEST(expr, maxBytesPerString, timeColumn)`|Like `LATEST(expr, timeColumn)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
+|`LATEST_BY(expr, timestampExpr)`|Returns the latest value of `expr`, which must be numeric. The latest value of `expr` is taken from the row with the overall latest non-null value of `timestampExpr`. If the overall latest non-null value of `timestampExpr` appears in multiple rows, the `expr` may be taken from any of those rows.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`LATEST_BY(expr, timestampExpr, maxBytesPerString)`|Like `LATEST_BY(expr, timestampExpr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
 |`ANY_VALUE(expr)`|Returns any value of `expr` including null. `expr` must be numeric. This aggregator can simplify and optimize the performance by returning the first encountered value (including null)|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`ANY_VALUE(expr, maxBytesPerString)`|Like `ANY_VALUE(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
 |`GROUPING(expr, expr...)`|Returns a number to indicate which groupBy dimension is included in a row, when using `GROUPING SETS`. Refer to [additional documentation](aggregations.md#grouping-aggregator) on how to infer this number.|N/A|
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
index e7b60b9..c8a5374 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstAggregator.java
@@ -54,6 +54,9 @@ public abstract class NumericFirstAggregator<TSelector extends BaseNullableColum
   @Override
   public void aggregate()
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     long time = timeSelector.getLong();
     if (time < firstTime) {
       firstTime = time;
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
index ebb0a87..159c6e1 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/NumericFirstBufferAggregator.java
@@ -86,6 +86,9 @@ public abstract class NumericFirstBufferAggregator<TSelector extends BaseNullabl
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     long time = timeSelector.getLong();
     long firstTime = buf.getLong(position);
     if (time < firstTime) {
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
index 5b581d5..8a6654f 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregator.java
@@ -56,6 +56,9 @@ public class StringFirstAggregator implements Aggregator
   @Override
   public void aggregate()
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     if (needsFoldCheck) {
       // Less efficient code path when folding is a possibility (we must read the value selector first just in case
       // it's a foldable object).
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
index d84793e..fbf2a41 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregator.java
@@ -63,6 +63,9 @@ public class StringFirstBufferAggregator implements BufferAggregator
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     if (needsFoldCheck) {
       // Less efficient code path when folding is a possibility (we must read the value selector first just in case
       // it's a foldable object).
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
index 6506f97..14f424a 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastAggregator.java
@@ -50,6 +50,9 @@ public abstract class NumericLastAggregator<TSelector extends BaseNullableColumn
   @Override
   public void aggregate()
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     long time = timeSelector.getLong();
     if (time >= lastTime) {
       lastTime = time;
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
index 7c90aad..4b741e0 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/NumericLastBufferAggregator.java
@@ -89,6 +89,9 @@ public abstract class NumericLastBufferAggregator<TSelector extends BaseNullable
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     long time = timeSelector.getLong();
     long lastTime = buf.getLong(position);
     if (time >= lastTime) {
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
index 0c5fe33..a7c33c8 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregator.java
@@ -57,6 +57,9 @@ public class StringLastAggregator implements Aggregator
   @Override
   public void aggregate()
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     if (needsFoldCheck) {
       // Less efficient code path when folding is a possibility (we must read the value selector first just in case
       // it's a foldable object).
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
index 9da9852..8611ef7 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregator.java
@@ -64,6 +64,9 @@ public class StringLastBufferAggregator implements BufferAggregator
   @Override
   public void aggregate(ByteBuffer buf, int position)
   {
+    if (timeSelector.isNull()) {
+      return;
+    }
     if (needsFoldCheck) {
       // Less efficient code path when folding is a possibility (we must read the value selector first just in case
       // it's a foldable object).
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
index 224ecf4..48e305a 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
@@ -204,38 +204,20 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
         theAggFactory = aggregatorType.createAggregatorFactory(aggregatorName, fieldName, null, outputType, -1);
         break;
       case 2:
-        if (!outputType.isNumeric()) { // translates (expr, maxBytesPerString) signature
-          theAggFactory = aggregatorType.createAggregatorFactory(
-              aggregatorName,
-              fieldName,
-              null,
-              outputType,
-              RexLiteral.intValue(rexNodes.get(1))
-          );
-        } else { // translates (expr, timeColumn) signature
-          theAggFactory = aggregatorType.createAggregatorFactory(
-              aggregatorName,
-              fieldName,
-              getColumnName(plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1)),
-              outputType,
-              -1
-          );
-        }
-        break;
-      case 3:
         theAggFactory = aggregatorType.createAggregatorFactory(
             aggregatorName,
             fieldName,
-            getColumnName(plannerContext, virtualColumnRegistry, args.get(2), rexNodes.get(2)),
+            null,
             outputType,
             RexLiteral.intValue(rexNodes.get(1))
         );
         break;
       default:
         throw new IAE(
-            "aggregation[%s], Invalid number of arguments[%,d] to Earliest/Latest/Any operator",
+            "aggregation[%s], Invalid number of arguments[%,d] to [%s] operator",
             aggregatorName,
-            args.size()
+            args.size(),
+            aggregatorType.name()
         );
     }
 
@@ -245,7 +227,7 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
     );
   }
 
-  private String getColumnName(
+  static String getColumnName(
       PlannerContext plannerContext,
       VirtualColumnRegistry virtualColumnRegistry,
       DruidExpression arg,
@@ -307,20 +289,9 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
                   "'" + aggregatorType.name() + "(expr, maxBytesPerString)'\n",
                   OperandTypes.ANY,
                   OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
-              ),
-              OperandTypes.sequence(
-                  "'" + aggregatorType.name() + "(expr, timeColumn)'\n",
-                  OperandTypes.ANY,
-                  OperandTypes.NUMERIC
-              ),
-              OperandTypes.sequence(
-                  "'" + aggregatorType.name() + "(expr, maxBytesPerString, timeColumn)'\n",
-                  OperandTypes.ANY,
-                  OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL),
-                  OperandTypes.NUMERIC
               )
           ),
-          SqlFunctionCategory.STRING,
+          SqlFunctionCategory.USER_DEFINED_FUNCTION,
           false,
           false,
           Optionality.FORBIDDEN
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java
new file mode 100644
index 0000000..f13a918
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestBySqlAggregator.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.aggregation.builtin;
+
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.InferTypes;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.util.Optionality;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.aggregation.Aggregation;
+import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.Expressions;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class EarliestLatestBySqlAggregator implements SqlAggregator
+{
+  public static final SqlAggregator EARLIEST_BY = new EarliestLatestBySqlAggregator(EarliestLatestAnySqlAggregator.AggregatorType.EARLIEST);
+  public static final SqlAggregator LATEST_BY = new EarliestLatestBySqlAggregator(EarliestLatestAnySqlAggregator.AggregatorType.LATEST);
+
+  private final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType;
+  private final SqlAggFunction function;
+
+  private EarliestLatestBySqlAggregator(final EarliestLatestAnySqlAggregator.AggregatorType aggregatorType)
+  {
+    this.aggregatorType = aggregatorType;
+    this.function = new EarliestByLatestBySqlAggFunction(aggregatorType);
+  }
+
+  @Override
+  public SqlAggFunction calciteFunction()
+  {
+    return function;
+  }
+
+  @Nullable
+  @Override
+  public Aggregation toDruidAggregation(
+      final PlannerContext plannerContext,
+      final RowSignature rowSignature,
+      final VirtualColumnRegistry virtualColumnRegistry,
+      final RexBuilder rexBuilder,
+      final String name,
+      final AggregateCall aggregateCall,
+      final Project project,
+      final List<Aggregation> existingAggregations,
+      final boolean finalizeAggregations
+  )
+  {
+    final List<RexNode> rexNodes = aggregateCall
+        .getArgList()
+        .stream()
+        .map(i -> Expressions.fromFieldAccess(rowSignature, project, i))
+        .collect(Collectors.toList());
+
+    final List<DruidExpression> args = Expressions.toDruidExpressions(plannerContext, rowSignature, rexNodes);
+
+    if (args == null) {
+      return null;
+    }
+
+    final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
+    final ColumnType outputType = Calcites.getColumnTypeForRelDataType(aggregateCall.getType());
+    if (outputType == null) {
+      throw new ISE(
+          "Cannot translate output sqlTypeName[%s] to Druid type for aggregator[%s]",
+          aggregateCall.getType().getSqlTypeName(),
+          aggregateCall.getName()
+      );
+    }
+
+    final String fieldName = EarliestLatestAnySqlAggregator.getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0));
+
+    final AggregatorFactory theAggFactory;
+    switch (args.size()) {
+      case 2:
+        theAggFactory = aggregatorType.createAggregatorFactory(
+            aggregatorName,
+            fieldName,
+            EarliestLatestAnySqlAggregator.getColumnName(plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1)),
+            outputType,
+            -1
+        );
+        break;
+      case 3:
+        theAggFactory = aggregatorType.createAggregatorFactory(
+            aggregatorName,
+            fieldName,
+            EarliestLatestAnySqlAggregator.getColumnName(plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1)),
+            outputType,
+            RexLiteral.intValue(rexNodes.get(2))
+        );
+        break;
+      default:
+        throw new IAE(
+            "aggregation[%s], Invalid number of arguments[%,d] to [%s] operator",
+            aggregatorName,
+            args.size(),
+            aggregatorType.name()
+        );
+    }
+
+    return Aggregation.create(
+        Collections.singletonList(theAggFactory),
+        finalizeAggregations ? new FinalizingFieldAccessPostAggregator(name, aggregatorName) : null
+    );
+  }
+
+  private static class EarliestByLatestBySqlAggFunction extends SqlAggFunction
+  {
+    private static final SqlReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE =
+        new EarliestLatestAnySqlAggregator.EarliestLatestReturnTypeInference(0);
+
+    EarliestByLatestBySqlAggFunction(EarliestLatestAnySqlAggregator.AggregatorType aggregatorType)
+    {
+      super(
+          StringUtils.format("%s_BY", aggregatorType.name()),
+          null,
+          SqlKind.OTHER_FUNCTION,
+          EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE,
+          InferTypes.RETURN_TYPE,
+          OperandTypes.or(
+              OperandTypes.sequence(
+                  "'" + aggregatorType.name() + "(expr, timeColumn)'\n",
+                  OperandTypes.ANY,
+                  OperandTypes.family(SqlTypeFamily.TIMESTAMP)
+              ),
+              OperandTypes.sequence(
+                  "'" + aggregatorType.name() + "(expr, timeColumn, maxBytesPerString)'\n",
+                  OperandTypes.ANY,
+                  OperandTypes.family(SqlTypeFamily.TIMESTAMP),
+                  OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
+              )
+          ),
+          SqlFunctionCategory.USER_DEFINED_FUNCTION,
+          false,
+          false,
+          Optionality.FORBIDDEN
+      );
+    }
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
index 41ed11c..11244d4 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidOperatorTable.java
@@ -38,6 +38,7 @@ import org.apache.druid.sql.calcite.aggregation.builtin.AvgSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.BitwiseSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.BuiltinApproxCountDistinctSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.EarliestLatestAnySqlAggregator;
+import org.apache.druid.sql.calcite.aggregation.builtin.EarliestLatestBySqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.GroupingSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.MaxSqlAggregator;
 import org.apache.druid.sql.calcite.aggregation.builtin.MinSqlAggregator;
@@ -133,6 +134,8 @@ public class DruidOperatorTable implements SqlOperatorTable
                    .add(EarliestLatestAnySqlAggregator.EARLIEST)
                    .add(EarliestLatestAnySqlAggregator.LATEST)
                    .add(EarliestLatestAnySqlAggregator.ANY_VALUE)
+                   .add(EarliestLatestBySqlAggregator.EARLIEST_BY)
+                   .add(EarliestLatestBySqlAggregator.LATEST_BY)
                    .add(new MinSqlAggregator())
                    .add(new MaxSqlAggregator())
                    .add(new SumSqlAggregator())
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 326c174..3c77792 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -609,12 +609,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         "SELECT "
         + "EARLIEST(cnt), EARLIEST(m1), EARLIEST(dim1, 10), "
         + "EARLIEST(cnt + 1), EARLIEST(m1 + 1), EARLIEST(dim1 || CAST(cnt AS VARCHAR), 10), "
-        + "EARLIEST(cnt, m1), EARLIEST(m1, m1), EARLIEST(dim1, 10, m1), "
-        + "EARLIEST(cnt + 1, m1), EARLIEST(m1 + 1, m1), EARLIEST(dim1 || CAST(cnt AS VARCHAR), 10, m1) "
-        + "FROM druid.foo",
+        + "EARLIEST_BY(cnt, MILLIS_TO_TIMESTAMP(l1)), EARLIEST_BY(m1, MILLIS_TO_TIMESTAMP(l1)), EARLIEST_BY(dim1, MILLIS_TO_TIMESTAMP(l1), 10), "
+        + "EARLIEST_BY(cnt + 1, MILLIS_TO_TIMESTAMP(l1)), EARLIEST_BY(m1 + 1, MILLIS_TO_TIMESTAMP(l1)), EARLIEST_BY(dim1 || CAST(cnt AS VARCHAR), MILLIS_TO_TIMESTAMP(l1), 10) "
+        + "FROM druid.numfoo",
         ImmutableList.of(
             Druids.newTimeseriesQueryBuilder()
-                  .dataSource(CalciteTests.DATASOURCE1)
+                  .dataSource(CalciteTests.DATASOURCE3)
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .granularity(Granularities.ALL)
                   .virtualColumns(
@@ -630,19 +630,19 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                           new LongFirstAggregatorFactory("a3", "v0", null),
                           new FloatFirstAggregatorFactory("a4", "v1", null),
                           new StringFirstAggregatorFactory("a5", "v2", null, 10),
-                          new LongFirstAggregatorFactory("a6", "cnt", "m1"),
-                          new FloatFirstAggregatorFactory("a7", "m1", "m1"),
-                          new StringFirstAggregatorFactory("a8", "dim1", "m1", 10),
-                          new LongFirstAggregatorFactory("a9", "v0", "m1"),
-                          new FloatFirstAggregatorFactory("a10", "v1", "m1"),
-                          new StringFirstAggregatorFactory("a11", "v2", "m1", 10)
+                          new LongFirstAggregatorFactory("a6", "cnt", "l1"),
+                          new FloatFirstAggregatorFactory("a7", "m1", "l1"),
+                          new StringFirstAggregatorFactory("a8", "dim1", "l1", 10),
+                          new LongFirstAggregatorFactory("a9", "v0", "l1"),
+                          new FloatFirstAggregatorFactory("a10", "v1", "l1"),
+                          new StringFirstAggregatorFactory("a11", "v2", "l1", 10)
                       )
                   )
                   .context(QUERY_CONTEXT_DEFAULT)
                   .build()
         ),
         ImmutableList.of(
-            new Object[]{1L, 1.0f, "", 2L, 2.0f, "1", 1L, 1.0f, "", 2L, 2.0f, "1"}
+            new Object[]{1L, 1.0f, "", 2L, 2.0f, "1", 1L, 3.0f, "2", 2L, 4.0f, "21"}
         )
     );
   }
@@ -657,12 +657,12 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         "SELECT "
         + "LATEST(cnt), LATEST(m1), LATEST(dim1, 10), "
         + "LATEST(cnt + 1), LATEST(m1 + 1), LATEST(dim1 || CAST(cnt AS VARCHAR), 10), "
-        + "LATEST(cnt, m1), LATEST(m1, m1), LATEST(dim1, 10, m1), "
-        + "LATEST(cnt + 1, m1), LATEST(m1 + 1, m1), LATEST(dim1 || CAST(cnt AS VARCHAR), 10, m1) "
-        + "FROM druid.foo",
+        + "LATEST_BY(cnt, MILLIS_TO_TIMESTAMP(l1)), LATEST_BY(m1, MILLIS_TO_TIMESTAMP(l1)), LATEST_BY(dim1, MILLIS_TO_TIMESTAMP(l1), 10), "
+        + "LATEST_BY(cnt + 1, MILLIS_TO_TIMESTAMP(l1)), LATEST_BY(m1 + 1, MILLIS_TO_TIMESTAMP(l1)), LATEST_BY(dim1 || CAST(cnt AS VARCHAR), MILLIS_TO_TIMESTAMP(l1), 10) "
+        + "FROM druid.numfoo",
         ImmutableList.of(
             Druids.newTimeseriesQueryBuilder()
-                  .dataSource(CalciteTests.DATASOURCE1)
+                  .dataSource(CalciteTests.DATASOURCE3)
                   .intervals(querySegmentSpec(Filtration.eternity()))
                   .granularity(Granularities.ALL)
                   .virtualColumns(
@@ -678,23 +678,49 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                           new LongLastAggregatorFactory("a3", "v0", null),
                           new FloatLastAggregatorFactory("a4", "v1", null),
                           new StringLastAggregatorFactory("a5", "v2", null, 10),
-                          new LongLastAggregatorFactory("a6", "cnt", "m1"),
-                          new FloatLastAggregatorFactory("a7", "m1", "m1"),
-                          new StringLastAggregatorFactory("a8", "dim1", "m1", 10),
-                          new LongLastAggregatorFactory("a9", "v0", "m1"),
-                          new FloatLastAggregatorFactory("a10", "v1", "m1"),
-                          new StringLastAggregatorFactory("a11", "v2", "m1", 10)
+                          new LongLastAggregatorFactory("a6", "cnt", "l1"),
+                          new FloatLastAggregatorFactory("a7", "m1", "l1"),
+                          new StringLastAggregatorFactory("a8", "dim1", "l1", 10),
+                          new LongLastAggregatorFactory("a9", "v0", "l1"),
+                          new FloatLastAggregatorFactory("a10", "v1", "l1"),
+                          new StringLastAggregatorFactory("a11", "v2", "l1", 10)
                       )
                   )
                   .context(QUERY_CONTEXT_DEFAULT)
                   .build()
         ),
         ImmutableList.of(
-            new Object[]{1L, 6.0f, "abc", 2L, 7.0f, "abc1", 1L, 6.0f, "abc", 2L, 7.0f, "abc1"}
+            new Object[]{1L, 6.0f, "abc", 2L, 7.0f, "abc1", 1L, 2.0f, "10.1", 2L, 3.0f, "10.11"}
         )
     );
   }
 
+  @Test
+  public void testEarliestByInvalidTimestamp() throws Exception
+  {
+    expectedException.expect(SqlPlanningException.class);
+    expectedException.expectMessage("Cannot apply 'EARLIEST_BY' to arguments of type 'EARLIEST_BY(<FLOAT>, <BIGINT>)");
+
+    testQuery(
+        "SELECT EARLIEST_BY(m1, l1) FROM druid.numfoo",
+        ImmutableList.of(),
+        ImmutableList.of()
+    );
+  }
+
+  @Test
+  public void testLatestByInvalidTimestamp() throws Exception
+  {
+    expectedException.expect(SqlPlanningException.class);
+    expectedException.expectMessage("Cannot apply 'LATEST_BY' to arguments of type 'LATEST_BY(<FLOAT>, <BIGINT>)");
+
+    testQuery(
+        "SELECT LATEST_BY(m1, l1) FROM druid.numfoo",
+        ImmutableList.of(),
+        ImmutableList.of()
+    );
+  }
+
   // This test the on-heap version of the AnyAggregator (Double/Float/Long/String)
   @Test
   public void testAnyAggregator() throws Exception

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org