You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2021/11/25 04:14:41 UTC

[druid] branch master updated: Specify time column for first/last aggregators (#11949)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c08055  Specify time column for first/last aggregators (#11949)
2c08055 is described below

commit 2c08055962f1c23e2749ba4e647ec85853723e5b
Author: Rohan Garg <77...@users.noreply.github.com>
AuthorDate: Thu Nov 25 09:44:14 2021 +0530

    Specify time column for first/last aggregators (#11949)
    
    Add the ability to pass time column in first/last aggregator (and latest/earliest SQL functions). It is to support cases where the time to query upon is stored as a part of a column different than __time. Also, some other logical time column can be specified.
---
 docs/querying/aggregations.md                      |  34 +++---
 docs/querying/sql.md                               |   4 +
 .../indexing/common/task/CompactionTaskTest.java   |   4 +-
 .../first/DoubleFirstAggregatorFactory.java        |  38 ++++---
 .../first/FloatFirstAggregatorFactory.java         |  38 ++++---
 .../first/LongFirstAggregatorFactory.java          |  36 ++++---
 .../first/StringFirstAggregatorFactory.java        |  24 +++--
 .../first/StringFirstFoldingAggregatorFactory.java |   2 +-
 .../last/DoubleLastAggregatorFactory.java          |  38 ++++---
 .../last/FloatLastAggregatorFactory.java           |  38 ++++---
 .../last/LongLastAggregatorFactory.java            |  38 ++++---
 .../last/StringLastAggregatorFactory.java          |  20 +++-
 .../last/StringLastFoldingAggregatorFactory.java   |   2 +-
 .../query/aggregation/AggregatorFactoryTest.java   |  16 +--
 .../first/DoubleFirstAggregationTest.java          |  51 ++++++++-
 .../first/FloatFirstAggregationTest.java           |  61 +++++++++--
 .../first/LongFirstAggregationTest.java            |  51 ++++++++-
 .../first/StringFirstAggregationTest.java          |  29 ++++-
 .../first/StringFirstBufferAggregatorTest.java     |   8 +-
 .../first/StringFirstTimeseriesQueryTest.java      |  10 +-
 .../last/DoubleLastAggregationTest.java            |  52 ++++++++-
 .../aggregation/last/FloatLastAggregationTest.java |  62 +++++++++--
 .../aggregation/last/LongLastAggregationTest.java  |  52 ++++++++-
 .../last/StringLastAggregationTest.java            |  42 +++++++-
 .../last/StringLastBufferAggregatorTest.java       |   8 +-
 .../last/StringLastTimeseriesQueryTest.java        |  10 +-
 .../FinalizingFieldAccessPostAggregatorTest.java   |   2 +-
 .../groupby/GroupByQueryQueryToolChestTest.java    |   8 +-
 .../query/groupby/GroupByQueryRunnerTest.java      |  16 +--
 .../TimeseriesQueryQueryToolChestTest.java         |   2 +-
 .../timeseries/TimeseriesQueryRunnerTest.java      |   6 +-
 .../query/topn/TopNQueryQueryToolChestTest.java    |   8 +-
 .../druid/query/topn/TopNQueryRunnerTest.java      |  14 +--
 .../druid/segment/IndexMergerRollupTest.java       |   4 +-
 .../builtin/EarliestLatestAnySqlAggregator.java    | 120 +++++++++++++++------
 .../druid/sql/calcite/CalciteJoinQueryTest.java    |   2 +-
 .../apache/druid/sql/calcite/CalciteQueryTest.java | 110 +++++++++++--------
 37 files changed, 780 insertions(+), 280 deletions(-)

diff --git a/docs/querying/aggregations.md b/docs/querying/aggregations.md
index cf592d9..7057ed9 100644
--- a/docs/querying/aggregations.md
+++ b/docs/querying/aggregations.md
@@ -143,79 +143,85 @@ Note that queries with first/last aggregators on a segment created with rollup e
 
 #### `doubleFirst` aggregator
 
-`doubleFirst` computes the metric value with the minimum timestamp or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
+`doubleFirst` computes the metric value with the minimum value for time column or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
 
 ```json
 {
   "type" : "doubleFirst",
   "name" : <output_name>,
-  "fieldName" : <metric_name>
+  "fieldName" : <metric_name>,
+  "timeColumn" : <time_column_name> # (optional, defaults to __time)
 }
 ```
 
 #### `doubleLast` aggregator
 
-`doubleLast` computes the metric value with the maximum timestamp or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
+`doubleLast` computes the metric value with the maximum value for time column or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
 
 ```json
 {
   "type" : "doubleLast",
   "name" : <output_name>,
-  "fieldName" : <metric_name>
+  "fieldName" : <metric_name>,
+  "timeColumn" : <time_column_name> # (optional, defaults to __time)
 }
 ```
 
 #### `floatFirst` aggregator
 
-`floatFirst` computes the metric value with the minimum timestamp or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
+`floatFirst` computes the metric value with the minimum value for time column or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
 
 ```json
 {
   "type" : "floatFirst",
   "name" : <output_name>,
-  "fieldName" : <metric_name>
+  "fieldName" : <metric_name>,
+  "timeColumn" : <time_column_name> # (optional, defaults to __time)
 }
 ```
 
 #### `floatLast` aggregator
 
-`floatLast` computes the metric value with the maximum timestamp or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
+`floatLast` computes the metric value with the maximum value for time column or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
 
 ```json
 {
   "type" : "floatLast",
   "name" : <output_name>,
-  "fieldName" : <metric_name>
+  "fieldName" : <metric_name>,
+  "timeColumn" : <time_column_name> # (optional, defaults to __time)
 }
 ```
 
 #### `longFirst` aggregator
 
-`longFirst` computes the metric value with the minimum timestamp or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
+`longFirst` computes the metric value with the minimum value for time column or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
 
 ```json
 {
   "type" : "longFirst",
   "name" : <output_name>,
-  "fieldName" : <metric_name>
+  "fieldName" : <metric_name>,
+  "timeColumn" : <time_column_name> # (optional, defaults to __time)
 }
 ```
 
 #### `longLast` aggregator
 
-`longLast` computes the metric value with the maximum timestamp or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
+`longLast` computes the metric value with the maximum value for time column or 0 in default mode, or `null` in SQL-compatible mode if no row exists.
 
 ```json
 {
   "type" : "longLast",
   "name" : <output_name>,
   "fieldName" : <metric_name>,
+  "timeColumn" : <time_column_name> # (optional, defaults to __time)
 }
 ```
 
 #### `stringFirst` aggregator
 
-`stringFirst` computes the metric value with the minimum timestamp or `null` if no row exists.
+`stringFirst` computes the metric value with the minimum value for time column or `null` if no row exists.
 
 ```json
 {
@@ -223,6 +229,7 @@ Note that queries with first/last aggregators on a segment created with rollup e
   "name" : <output_name>,
   "fieldName" : <metric_name>,
   "maxStringBytes" : <integer> # (optional, defaults to 1024)
+  "timeColumn" : <time_column_name> # (optional, defaults to __time)
 }
 ```
 
@@ -230,7 +237,7 @@ Note that queries with first/last aggregators on a segment created with rollup e
 
 #### `stringLast` aggregator
 
-`stringLast` computes the metric value with the maximum timestamp or `null` if no row exists.
+`stringLast` computes the metric value with the maximum value for time column or `null` if no row exists.
 
 ```json
 {
@@ -238,6 +245,7 @@ Note that queries with first/last aggregators on a segment created with rollup e
   "name" : <output_name>,
   "fieldName" : <metric_name>,
   "maxStringBytes" : <integer> # (optional, defaults to 1024)
+  "timeColumn" : <time_column_name> # (optional, defaults to __time)
 }
 ```
 
diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index 92bf71b..5efe4f4 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -353,9 +353,13 @@ Only the COUNT, ARRAY_AGG, and STRING_AGG aggregations can accept the DISTINCT k
 |`STDDEV_SAMP(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`STDDEV(expr)`|Computes standard deviation sample of `expr`. See [stats extension](../development/extensions-core/stats.md) documentation for additional details.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`EARLIEST(expr)`|Returns the earliest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "earliest" is the value first encountered with the minimum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the first value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`EARLIEST(expr, timeColumn)`|Returns the earliest value of `expr`, which must be numeric. Earliest value is defined as the value first encountered with the minimum overall value of time column of all values being aggregated.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`EARLIEST(expr, maxBytesPerString)`|Like `EARLIEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
+|`EARLIEST(expr, maxBytesPerString, timeColumn)`|Like `EARLIEST(expr, timeColumn)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
 |`LATEST(expr)`|Returns the latest value of `expr`, which must be numeric. If `expr` comes from a relation with a timestamp column (like a Druid datasource) then "latest" is the value last encountered with the maximum overall timestamp of all values being aggregated. If `expr` does not come from a relation with a timestamp, then it is simply the last value encountered.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
+|`LATEST(expr, timeColumn)`|Returns the latest value of `expr`, which must be numeric. Latest value is defined as the value last encountered with the maximum overall value of time column of all values being aggregated.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`LATEST(expr, maxBytesPerString)`|Like `LATEST(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
+|`LATEST(expr, maxBytesPerString, timeColumn)`|Like `LATEST(expr, timeColumn)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
 |`ANY_VALUE(expr)`|Returns any value of `expr` including null. `expr` must be numeric. This aggregator can simplify and optimize the performance by returning the first encountered value (including null)|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `0`|
 |`ANY_VALUE(expr, maxBytesPerString)`|Like `ANY_VALUE(expr)`, but for strings. The `maxBytesPerString` parameter determines how much aggregation space to allocate per string. Strings longer than this limit will be truncated. This parameter should be set as low as possible, since high values will lead to wasted memory.|`null` if `druid.generic.useDefaultValueForNull=false`, otherwise `''`|
 |`GROUPING(expr, expr...)`|Returns a number to indicate which groupBy dimension is included in a row, when using `GROUPING SETS`. Refer to [additional documentation](aggregations.md#grouping-aggregator) on how to infer this number.|N/A|
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index f5bf50a..6b51052 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -239,8 +239,8 @@ public class CompactionTaskTest
     AGGREGATORS.add(new CountAggregatorFactory("agg_0"));
     AGGREGATORS.add(new LongSumAggregatorFactory("agg_1", "long_dim_1"));
     AGGREGATORS.add(new LongMaxAggregatorFactory("agg_2", "long_dim_2"));
-    AGGREGATORS.add(new FloatFirstAggregatorFactory("agg_3", "float_dim_3"));
-    AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));
+    AGGREGATORS.add(new FloatFirstAggregatorFactory("agg_3", "float_dim_3", null));
+    AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4", null));
 
     for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
       SEGMENT_MAP.put(
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java
index c39350a..444ade4 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregatorFactory.java
@@ -23,13 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.aggregation.AggregateCombiner;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
@@ -77,13 +77,15 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
       SerializablePair.createNullHandlingComparator(Double::compare, true);
 
   private final String fieldName;
+  private final String timeColumn;
   private final String name;
   private final boolean storeDoubleAsFloat;
 
   @JsonCreator
   public DoubleFirstAggregatorFactory(
       @JsonProperty("name") String name,
-      @JsonProperty("fieldName") final String fieldName
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("timeColumn") @Nullable final String timeColumn
   )
   {
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
@@ -91,6 +93,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
 
     this.name = name;
     this.fieldName = fieldName;
+    this.timeColumn = timeColumn == null ? ColumnHolder.TIME_COLUMN_NAME : timeColumn;
     this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat();
   }
 
@@ -102,7 +105,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
       return NIL_AGGREGATOR;
     } else {
       return new DoubleFirstAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -116,7 +119,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
       return NIL_BUFFER_AGGREGATOR;
     } else {
       return new DoubleFirstBufferAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -156,7 +159,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new DoubleFirstAggregatorFactory(name, name)
+    return new DoubleFirstAggregatorFactory(name, name, timeColumn)
     {
       @Override
       public Aggregator factorize(ColumnSelectorFactory metricFactory)
@@ -223,7 +226,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
   @Override
   public List<AggregatorFactory> getRequiredColumns()
   {
-    return Collections.singletonList(new DoubleFirstAggregatorFactory(fieldName, fieldName));
+    return Collections.singletonList(new DoubleFirstAggregatorFactory(fieldName, fieldName, timeColumn));
   }
 
   @Override
@@ -256,21 +259,25 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
     return fieldName;
   }
 
+  @JsonProperty
+  public String getTimeColumn()
+  {
+    return timeColumn;
+  }
+
   @Override
   public List<String> requiredFields()
   {
-    return Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, fieldName);
+    return Arrays.asList(timeColumn, fieldName);
   }
 
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
-    return ByteBuffer.allocate(1 + fieldNameBytes.length)
-                     .put(AggregatorUtil.DOUBLE_FIRST_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .array();
+    return new CacheKeyBuilder(AggregatorUtil.DOUBLE_FIRST_CACHE_TYPE_ID)
+        .appendString(fieldName)
+        .appendString(timeColumn)
+        .build();
   }
 
   @Override
@@ -307,13 +314,13 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
 
     DoubleFirstAggregatorFactory that = (DoubleFirstAggregatorFactory) o;
 
-    return fieldName.equals(that.fieldName) && name.equals(that.name);
+    return fieldName.equals(that.fieldName) && timeColumn.equals(that.timeColumn) && name.equals(that.name);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(fieldName, name);
+    return Objects.hash(fieldName, name, timeColumn);
   }
 
   @Override
@@ -322,6 +329,7 @@ public class DoubleFirstAggregatorFactory extends AggregatorFactory
     return "DoubleFirstAggregatorFactory{" +
            "name='" + name + '\'' +
            ", fieldName='" + fieldName + '\'' +
+           ", timeColumn='" + timeColumn + '\'' +
            '}';
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java
index e3afa30..f8a592d 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/FloatFirstAggregatorFactory.java
@@ -23,13 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.aggregation.AggregateCombiner;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseFloatColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
@@ -77,12 +77,14 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
       SerializablePair.createNullHandlingComparator(Float::compare, true);
 
   private final String fieldName;
+  private final String timeColumn;
   private final String name;
 
   @JsonCreator
   public FloatFirstAggregatorFactory(
       @JsonProperty("name") String name,
-      @JsonProperty("fieldName") final String fieldName
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("timeColumn") @Nullable final String timeColumn
   )
   {
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
@@ -90,6 +92,7 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
 
     this.name = name;
     this.fieldName = fieldName;
+    this.timeColumn = timeColumn == null ? ColumnHolder.TIME_COLUMN_NAME : timeColumn;
   }
 
   @Override
@@ -100,7 +103,7 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
       return NIL_AGGREGATOR;
     } else {
       return new FloatFirstAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -114,7 +117,7 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
       return NIL_BUFFER_AGGREGATOR;
     } else {
       return new FloatFirstBufferAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -155,7 +158,7 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
   public AggregatorFactory getCombiningFactory()
   {
 
-    return new FloatFirstAggregatorFactory(name, name)
+    return new FloatFirstAggregatorFactory(name, name, timeColumn)
     {
       @Override
       public Aggregator factorize(ColumnSelectorFactory metricFactory)
@@ -220,7 +223,7 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
   @Override
   public List<AggregatorFactory> getRequiredColumns()
   {
-    return Collections.singletonList(new FloatFirstAggregatorFactory(fieldName, fieldName));
+    return Collections.singletonList(new FloatFirstAggregatorFactory(fieldName, fieldName, timeColumn));
   }
 
   @Override
@@ -253,21 +256,25 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
     return fieldName;
   }
 
+  @JsonProperty
+  public String getTimeColumn()
+  {
+    return timeColumn;
+  }
+
   @Override
   public List<String> requiredFields()
   {
-    return Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, fieldName);
+    return Arrays.asList(timeColumn, fieldName);
   }
 
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
-    return ByteBuffer.allocate(1 + fieldNameBytes.length)
-                     .put(AggregatorUtil.FLOAT_FIRST_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .array();
+    return new CacheKeyBuilder(AggregatorUtil.FLOAT_FIRST_CACHE_TYPE_ID)
+        .appendString(fieldName)
+        .appendString(timeColumn)
+        .build();
   }
 
   @Override
@@ -302,13 +309,13 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
 
     FloatFirstAggregatorFactory that = (FloatFirstAggregatorFactory) o;
 
-    return fieldName.equals(that.fieldName) && name.equals(that.name);
+    return fieldName.equals(that.fieldName) && timeColumn.equals(that.timeColumn) && name.equals(that.name);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(fieldName, name);
+    return Objects.hash(fieldName, name, timeColumn);
   }
 
   @Override
@@ -317,6 +324,7 @@ public class FloatFirstAggregatorFactory extends AggregatorFactory
     return "FloatFirstAggregatorFactory{" +
            "name='" + name + '\'' +
            ", fieldName='" + fieldName + '\'' +
+           ", timeColumn='" + timeColumn + '\'' +
            '}';
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java
index 3b384d2..bfc8ae4 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/LongFirstAggregatorFactory.java
@@ -23,13 +23,13 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.aggregation.AggregateCombiner;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
+import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
@@ -76,12 +76,14 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
       SerializablePair.createNullHandlingComparator(Long::compare, true);
 
   private final String fieldName;
+  private final String timeColumn;
   private final String name;
 
   @JsonCreator
   public LongFirstAggregatorFactory(
       @JsonProperty("name") String name,
-      @JsonProperty("fieldName") final String fieldName
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("timeColumn") @Nullable final String timeColumn
   )
   {
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
@@ -89,6 +91,7 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
 
     this.name = name;
     this.fieldName = fieldName;
+    this.timeColumn = timeColumn == null ? ColumnHolder.TIME_COLUMN_NAME : timeColumn;
   }
 
   @Override
@@ -99,7 +102,7 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
       return NIL_AGGREGATOR;
     } else {
       return new LongFirstAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -113,7 +116,7 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
       return NIL_BUFFER_AGGREGATOR;
     } else {
       return new LongFirstBufferAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -153,7 +156,7 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new LongFirstAggregatorFactory(name, name)
+    return new LongFirstAggregatorFactory(name, name, timeColumn)
     {
       @Override
       public Aggregator factorize(ColumnSelectorFactory metricFactory)
@@ -218,7 +221,7 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
   @Override
   public List<AggregatorFactory> getRequiredColumns()
   {
-    return Collections.singletonList(new LongFirstAggregatorFactory(fieldName, fieldName));
+    return Collections.singletonList(new LongFirstAggregatorFactory(fieldName, fieldName, timeColumn));
   }
 
   @Override
@@ -251,21 +254,25 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
     return fieldName;
   }
 
+  @JsonProperty
+  public String getTimeColumn()
+  {
+    return timeColumn;
+  }
+
   @Override
   public List<String> requiredFields()
   {
-    return Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, fieldName);
+    return Arrays.asList(timeColumn, fieldName);
   }
 
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
-    return ByteBuffer.allocate(1 + fieldNameBytes.length)
-                     .put(AggregatorUtil.LONG_FIRST_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .array();
+    return new CacheKeyBuilder(AggregatorUtil.LONG_FIRST_CACHE_TYPE_ID)
+        .appendString(fieldName)
+        .appendString(timeColumn)
+        .build();
   }
 
   @Override
@@ -300,7 +307,7 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
 
     LongFirstAggregatorFactory that = (LongFirstAggregatorFactory) o;
 
-    return fieldName.equals(that.fieldName) && name.equals(that.name);
+    return fieldName.equals(that.fieldName) && timeColumn.equals(that.timeColumn) && name.equals(that.name);
   }
 
   @Override
@@ -317,6 +324,7 @@ public class LongFirstAggregatorFactory extends AggregatorFactory
     return "LongFirstAggregatorFactory{" +
            "name='" + name + '\'' +
            ", fieldName='" + fieldName + '\'' +
+           ", timeColumn='" + timeColumn + '\'' +
            '}';
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
index 9eb4e34..f808d1d 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregatorFactory.java
@@ -126,12 +126,14 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
 
   private final String fieldName;
   private final String name;
+  private final String timeColumn;
   protected final int maxStringBytes;
 
   @JsonCreator
   public StringFirstAggregatorFactory(
       @JsonProperty("name") String name,
       @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("timeColumn") @Nullable final String timeColumn,
       @JsonProperty("maxStringBytes") Integer maxStringBytes
   )
   {
@@ -144,6 +146,7 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
 
     this.name = name;
     this.fieldName = fieldName;
+    this.timeColumn = timeColumn == null ? ColumnHolder.TIME_COLUMN_NAME : timeColumn;
     this.maxStringBytes = maxStringBytes == null
                           ? StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE
                           : maxStringBytes;
@@ -157,7 +160,7 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
       return NIL_AGGREGATOR;
     } else {
       return new StringFirstAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector,
           maxStringBytes,
           StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName))
@@ -173,7 +176,7 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
       return NIL_BUFFER_AGGREGATOR;
     } else {
       return new StringFirstBufferAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector,
           maxStringBytes,
           StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName))
@@ -202,13 +205,13 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new StringFirstAggregatorFactory(name, name, maxStringBytes);
+    return new StringFirstAggregatorFactory(name, name, timeColumn, maxStringBytes);
   }
 
   @Override
   public List<AggregatorFactory> getRequiredColumns()
   {
-    return Collections.singletonList(new StringFirstAggregatorFactory(fieldName, fieldName, maxStringBytes));
+    return Collections.singletonList(new StringFirstAggregatorFactory(fieldName, fieldName, timeColumn, maxStringBytes));
   }
 
   @Override
@@ -239,6 +242,12 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
   }
 
   @JsonProperty
+  public String getTimeColumn()
+  {
+    return timeColumn;
+  }
+
+  @JsonProperty
   public Integer getMaxStringBytes()
   {
     return maxStringBytes;
@@ -247,7 +256,7 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
   @Override
   public List<String> requiredFields()
   {
-    return Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, fieldName);
+    return Arrays.asList(timeColumn, fieldName);
   }
 
   @Override
@@ -255,6 +264,7 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
   {
     return new CacheKeyBuilder(AggregatorUtil.STRING_FIRST_CACHE_TYPE_ID)
         .appendString(fieldName)
+        .appendString(timeColumn)
         .appendInt(maxStringBytes)
         .build();
   }
@@ -292,13 +302,14 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
     StringFirstAggregatorFactory that = (StringFirstAggregatorFactory) o;
     return maxStringBytes == that.maxStringBytes &&
            Objects.equals(fieldName, that.fieldName) &&
+           Objects.equals(timeColumn, that.timeColumn) &&
            Objects.equals(name, that.name);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(fieldName, name, maxStringBytes);
+    return Objects.hash(fieldName, name, timeColumn, maxStringBytes);
   }
 
   @Override
@@ -307,6 +318,7 @@ public class StringFirstAggregatorFactory extends AggregatorFactory
     return "StringFirstAggregatorFactory{" +
            "fieldName='" + fieldName + '\'' +
            ", name='" + name + '\'' +
+           ", timeColumn='" + timeColumn + '\'' +
            ", maxStringBytes=" + maxStringBytes +
            '}';
   }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
index 6bade40..5441df0 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
@@ -29,6 +29,6 @@ public class StringFirstFoldingAggregatorFactory extends StringFirstAggregatorFa
   @JsonCreator
   public StringFirstFoldingAggregatorFactory(String name, String fieldName, Integer maxStringBytes)
   {
-    super(name, fieldName, maxStringBytes);
+    super(name, fieldName, null, maxStringBytes);
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java
index e0ace11..6e2da8e 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/DoubleLastAggregatorFactory.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.aggregation.AggregateCombiner;
 import org.apache.druid.query.aggregation.Aggregator;
@@ -32,6 +31,7 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
+import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
@@ -76,19 +76,22 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
   };
 
   private final String fieldName;
+  private final String timeColumn;
   private final String name;
   private final boolean storeDoubleAsFloat;
 
   @JsonCreator
   public DoubleLastAggregatorFactory(
       @JsonProperty("name") String name,
-      @JsonProperty("fieldName") final String fieldName
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("timeColumn") @Nullable final String timeColumn
   )
   {
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
     Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
     this.name = name;
     this.fieldName = fieldName;
+    this.timeColumn = timeColumn == null ? ColumnHolder.TIME_COLUMN_NAME : timeColumn;
     this.storeDoubleAsFloat = ColumnHolder.storeDoubleAsFloat();
   }
 
@@ -100,7 +103,7 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
       return NIL_AGGREGATOR;
     } else {
       return new DoubleLastAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -114,7 +117,7 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
       return NIL_BUFFER_AGGREGATOR;
     } else {
       return new DoubleLastBufferAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -154,7 +157,7 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new DoubleLastAggregatorFactory(name, name)
+    return new DoubleLastAggregatorFactory(name, name, timeColumn)
     {
       @Override
       public Aggregator factorize(ColumnSelectorFactory metricFactory)
@@ -221,7 +224,7 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
   @Override
   public List<AggregatorFactory> getRequiredColumns()
   {
-    return Collections.singletonList(new LongFirstAggregatorFactory(fieldName, fieldName));
+    return Collections.singletonList(new LongFirstAggregatorFactory(fieldName, fieldName, timeColumn));
   }
 
   @Override
@@ -254,21 +257,25 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
     return fieldName;
   }
 
+  @JsonProperty
+  public String getTimeColumn()
+  {
+    return timeColumn;
+  }
+
   @Override
   public List<String> requiredFields()
   {
-    return Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, fieldName);
+    return Arrays.asList(timeColumn, fieldName);
   }
 
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
-    return ByteBuffer.allocate(1 + fieldNameBytes.length)
-                     .put(AggregatorUtil.DOUBLE_LAST_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .array();
+    return new CacheKeyBuilder(AggregatorUtil.DOUBLE_LAST_CACHE_TYPE_ID)
+        .appendString(fieldName)
+        .appendString(timeColumn)
+        .build();
   }
 
   @Override
@@ -305,13 +312,13 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
 
     DoubleLastAggregatorFactory that = (DoubleLastAggregatorFactory) o;
 
-    return fieldName.equals(that.fieldName) && name.equals(that.name);
+    return fieldName.equals(that.fieldName) && timeColumn.equals(that.timeColumn) && name.equals(that.name);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(name, fieldName);
+    return Objects.hash(name, fieldName, timeColumn);
   }
 
   @Override
@@ -320,6 +327,7 @@ public class DoubleLastAggregatorFactory extends AggregatorFactory
     return "DoubleLastAggregatorFactory{" +
            "name='" + name + '\'' +
            ", fieldName='" + fieldName + '\'' +
+           ", timeColumn='" + timeColumn + '\'' +
            '}';
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java
index 26ec328..a7912cc 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/FloatLastAggregatorFactory.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.aggregation.AggregateCombiner;
 import org.apache.druid.query.aggregation.Aggregator;
@@ -32,6 +31,7 @@ import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
 import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
+import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseFloatColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
@@ -76,18 +76,21 @@ public class FloatLastAggregatorFactory extends AggregatorFactory
   };
 
   private final String fieldName;
+  private final String timeColumn;
   private final String name;
 
   @JsonCreator
   public FloatLastAggregatorFactory(
       @JsonProperty("name") String name,
-      @JsonProperty("fieldName") final String fieldName
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("timeColumn") @Nullable final String timeColumn
   )
   {
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
     Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
     this.name = name;
     this.fieldName = fieldName;
+    this.timeColumn = timeColumn == null ? ColumnHolder.TIME_COLUMN_NAME : timeColumn;
   }
 
   @Override
@@ -98,7 +101,7 @@ public class FloatLastAggregatorFactory extends AggregatorFactory
       return NIL_AGGREGATOR;
     } else {
       return new FloatLastAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -112,7 +115,7 @@ public class FloatLastAggregatorFactory extends AggregatorFactory
       return NIL_BUFFER_AGGREGATOR;
     } else {
       return new FloatLastBufferAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -152,7 +155,7 @@ public class FloatLastAggregatorFactory extends AggregatorFactory
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new FloatLastAggregatorFactory(name, name)
+    return new FloatLastAggregatorFactory(name, name, timeColumn)
     {
       @Override
       public Aggregator factorize(ColumnSelectorFactory metricFactory)
@@ -217,7 +220,7 @@ public class FloatLastAggregatorFactory extends AggregatorFactory
   @Override
   public List<AggregatorFactory> getRequiredColumns()
   {
-    return Collections.singletonList(new LongFirstAggregatorFactory(fieldName, fieldName));
+    return Collections.singletonList(new LongFirstAggregatorFactory(fieldName, fieldName, timeColumn));
   }
 
   @Override
@@ -250,22 +253,26 @@ public class FloatLastAggregatorFactory extends AggregatorFactory
     return fieldName;
   }
 
+  @JsonProperty
+  public String getTimeColumn()
+  {
+    return timeColumn;
+  }
+
   @Override
   public List<String> requiredFields()
   {
-    return Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, fieldName);
+    return Arrays.asList(timeColumn, fieldName);
   }
 
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
-    return ByteBuffer.allocate(2 + fieldNameBytes.length)
-                     .put(AggregatorUtil.FLOAT_LAST_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .put((byte) 0xff)
-                     .array();
+    return new CacheKeyBuilder(AggregatorUtil.FLOAT_LAST_CACHE_TYPE_ID)
+        .appendString(fieldName)
+        .appendString(timeColumn)
+        .appendByte((byte) 0xff)
+        .build();
   }
 
   @Override
@@ -300,7 +307,7 @@ public class FloatLastAggregatorFactory extends AggregatorFactory
 
     FloatLastAggregatorFactory that = (FloatLastAggregatorFactory) o;
 
-    return fieldName.equals(that.fieldName) && name.equals(that.name);
+    return fieldName.equals(that.fieldName) && timeColumn.equals(that.timeColumn) && name.equals(that.name);
   }
 
   @Override
@@ -315,6 +322,7 @@ public class FloatLastAggregatorFactory extends AggregatorFactory
     return "FloatLastAggregatorFactory{" +
            "name='" + name + '\'' +
            ", fieldName='" + fieldName + '\'' +
+           ", timeColumn='" + timeColumn + '\'' +
            '}';
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java
index 59b38ec..53461b0 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/LongLastAggregatorFactory.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.collections.SerializablePair;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.query.aggregation.AggregateCombiner;
 import org.apache.druid.query.aggregation.Aggregator;
@@ -31,6 +30,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.AggregatorUtil;
 import org.apache.druid.query.aggregation.BufferAggregator;
 import org.apache.druid.query.aggregation.first.LongFirstAggregatorFactory;
+import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
@@ -75,18 +75,21 @@ public class LongLastAggregatorFactory extends AggregatorFactory
   };
 
   private final String fieldName;
+  private final String timeColumn;
   private final String name;
 
   @JsonCreator
   public LongLastAggregatorFactory(
       @JsonProperty("name") String name,
-      @JsonProperty("fieldName") final String fieldName
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("timeColumn") @Nullable final String timeColumn
   )
   {
     Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
     Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
     this.name = name;
     this.fieldName = fieldName;
+    this.timeColumn = timeColumn == null ? ColumnHolder.TIME_COLUMN_NAME : timeColumn;
   }
 
   @Override
@@ -97,7 +100,7 @@ public class LongLastAggregatorFactory extends AggregatorFactory
       return NIL_AGGREGATOR;
     } else {
       return new LongLastAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -111,7 +114,7 @@ public class LongLastAggregatorFactory extends AggregatorFactory
       return NIL_BUFFER_AGGREGATOR;
     } else {
       return new LongLastBufferAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector
       );
     }
@@ -151,7 +154,7 @@ public class LongLastAggregatorFactory extends AggregatorFactory
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new LongLastAggregatorFactory(name, name)
+    return new LongLastAggregatorFactory(name, name, timeColumn)
     {
       @Override
       public Aggregator factorize(ColumnSelectorFactory metricFactory)
@@ -216,7 +219,7 @@ public class LongLastAggregatorFactory extends AggregatorFactory
   @Override
   public List<AggregatorFactory> getRequiredColumns()
   {
-    return Collections.singletonList(new LongLastAggregatorFactory(fieldName, fieldName));
+    return Collections.singletonList(new LongLastAggregatorFactory(fieldName, fieldName, timeColumn));
   }
 
   @Override
@@ -249,21 +252,25 @@ public class LongLastAggregatorFactory extends AggregatorFactory
     return fieldName;
   }
 
+  @JsonProperty
+  public String getTimeColumn()
+  {
+    return timeColumn;
+  }
+
   @Override
   public List<String> requiredFields()
   {
-    return Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, fieldName);
+    return Arrays.asList(timeColumn, fieldName);
   }
 
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
-
-    return ByteBuffer.allocate(1 + fieldNameBytes.length)
-                     .put(AggregatorUtil.LONG_LAST_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .array();
+    return new CacheKeyBuilder(AggregatorUtil.LONG_LAST_CACHE_TYPE_ID)
+        .appendString(fieldName)
+        .appendString(timeColumn)
+        .build();
   }
 
   @Override
@@ -298,13 +305,13 @@ public class LongLastAggregatorFactory extends AggregatorFactory
 
     LongLastAggregatorFactory that = (LongLastAggregatorFactory) o;
 
-    return name.equals(that.name) && fieldName.equals(that.fieldName);
+    return name.equals(that.name) && timeColumn.equals(that.timeColumn) && fieldName.equals(that.fieldName);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(name, fieldName);
+    return Objects.hash(name, fieldName, timeColumn);
   }
 
   @Override
@@ -313,6 +320,7 @@ public class LongLastAggregatorFactory extends AggregatorFactory
     return "LongLastAggregatorFactory{" +
            "name='" + name + '\'' +
            ", fieldName='" + fieldName + '\'' +
+           ", timeColumn='" + timeColumn + '\'' +
            '}';
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
index fcb284b..71bf66e 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregatorFactory.java
@@ -82,6 +82,7 @@ public class StringLastAggregatorFactory extends AggregatorFactory
   };
 
   private final String fieldName;
+  private final String timeColumn;
   private final String name;
   protected final int maxStringBytes;
 
@@ -89,6 +90,7 @@ public class StringLastAggregatorFactory extends AggregatorFactory
   public StringLastAggregatorFactory(
       @JsonProperty("name") String name,
       @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("timeColumn") @Nullable final String timeColumn,
       @JsonProperty("maxStringBytes") Integer maxStringBytes
   )
   {
@@ -101,6 +103,7 @@ public class StringLastAggregatorFactory extends AggregatorFactory
 
     this.name = name;
     this.fieldName = fieldName;
+    this.timeColumn = timeColumn == null ? ColumnHolder.TIME_COLUMN_NAME : timeColumn;
     this.maxStringBytes = maxStringBytes == null
                           ? StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE
                           : maxStringBytes;
@@ -114,7 +117,7 @@ public class StringLastAggregatorFactory extends AggregatorFactory
       return NIL_AGGREGATOR;
     } else {
       return new StringLastAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector,
           maxStringBytes,
           StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName))
@@ -130,7 +133,7 @@ public class StringLastAggregatorFactory extends AggregatorFactory
       return NIL_BUFFER_AGGREGATOR;
     } else {
       return new StringLastBufferAggregator(
-          metricFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME),
+          metricFactory.makeColumnValueSelector(timeColumn),
           valueSelector,
           maxStringBytes,
           StringFirstLastUtils.selectorNeedsFoldCheck(valueSelector, metricFactory.getColumnCapabilities(fieldName))
@@ -159,13 +162,13 @@ public class StringLastAggregatorFactory extends AggregatorFactory
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new StringLastAggregatorFactory(name, name, maxStringBytes);
+    return new StringLastAggregatorFactory(name, name, timeColumn, maxStringBytes);
   }
 
   @Override
   public List<AggregatorFactory> getRequiredColumns()
   {
-    return Collections.singletonList(new StringLastAggregatorFactory(fieldName, fieldName, maxStringBytes));
+    return Collections.singletonList(new StringLastAggregatorFactory(fieldName, fieldName, timeColumn, maxStringBytes));
   }
 
   @Override
@@ -196,6 +199,12 @@ public class StringLastAggregatorFactory extends AggregatorFactory
   }
 
   @JsonProperty
+  public String getTimeColumn()
+  {
+    return timeColumn;
+  }
+
+  @JsonProperty
   public Integer getMaxStringBytes()
   {
     return maxStringBytes;
@@ -204,7 +213,7 @@ public class StringLastAggregatorFactory extends AggregatorFactory
   @Override
   public List<String> requiredFields()
   {
-    return ImmutableList.of(ColumnHolder.TIME_COLUMN_NAME, fieldName);
+    return ImmutableList.of(timeColumn, fieldName);
   }
 
   @Override
@@ -212,6 +221,7 @@ public class StringLastAggregatorFactory extends AggregatorFactory
   {
     return new CacheKeyBuilder(AggregatorUtil.STRING_LAST_CACHE_TYPE_ID)
         .appendString(fieldName)
+        .appendString(timeColumn)
         .appendInt(maxStringBytes)
         .build();
   }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
index 7f92f00..c15bfaa 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
@@ -29,6 +29,6 @@ public class StringLastFoldingAggregatorFactory extends StringLastAggregatorFact
   @JsonCreator
   public StringLastFoldingAggregatorFactory(String name, String fieldName, Integer maxStringBytes)
   {
-    super(name, fieldName, maxStringBytes);
+    super(name, fieldName, null, maxStringBytes);
   }
 }
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java
index 0bc602c..1b2f802 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregatorFactoryTest.java
@@ -126,27 +126,27 @@ public class AggregatorFactoryTest extends InitializedNullHandlingTest
                   new LongSumAggregatorFactory("longSum", "long-col"),
                   new LongMinAggregatorFactory("longMin", "long-col"),
                   new LongMaxAggregatorFactory("longMax", "long-col"),
-                  new LongFirstAggregatorFactory("longFirst", "long-col"),
-                  new LongLastAggregatorFactory("longLast", "long-col"),
+                  new LongFirstAggregatorFactory("longFirst", "long-col", null),
+                  new LongLastAggregatorFactory("longLast", "long-col", null),
                   new LongAnyAggregatorFactory("longAny", "long-col"),
                   // double aggs
                   new DoubleSumAggregatorFactory("doubleSum", "double-col"),
                   new DoubleMinAggregatorFactory("doubleMin", "double-col"),
                   new DoubleMaxAggregatorFactory("doubleMax", "double-col"),
-                  new DoubleFirstAggregatorFactory("doubleFirst", "double-col"),
-                  new DoubleLastAggregatorFactory("doubleLast", "double-col"),
+                  new DoubleFirstAggregatorFactory("doubleFirst", "double-col", null),
+                  new DoubleLastAggregatorFactory("doubleLast", "double-col", null),
                   new DoubleAnyAggregatorFactory("doubleAny", "double-col"),
                   new DoubleMeanAggregatorFactory("doubleMean", "double-col"),
                   // float aggs
                   new FloatSumAggregatorFactory("floatSum", "float-col"),
                   new FloatMinAggregatorFactory("floatMin", "float-col"),
                   new FloatMaxAggregatorFactory("floatMax", "float-col"),
-                  new FloatFirstAggregatorFactory("floatFirst", "float-col"),
-                  new FloatLastAggregatorFactory("floatLast", "float-col"),
+                  new FloatFirstAggregatorFactory("floatFirst", "float-col", null),
+                  new FloatLastAggregatorFactory("floatLast", "float-col", null),
                   new FloatAnyAggregatorFactory("floatAny", "float-col"),
                   // string aggregators
-                  new StringFirstAggregatorFactory("stringFirst", "col", 1024),
-                  new StringLastAggregatorFactory("stringLast", "col", 1024),
+                  new StringFirstAggregatorFactory("stringFirst", "col", null, 1024),
+                  new StringLastAggregatorFactory("stringLast", "col", null, 1024),
                   new StringAnyAggregatorFactory("stringAny", "col", 1024),
                   // sketch aggs
                   new CardinalityAggregatorFactory("cardinality", ImmutableList.of(DefaultDimensionSpec.of("some-col")), false),
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java
index c85b6b1..ebe628a 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/DoubleFirstAggregationTest.java
@@ -45,11 +45,13 @@ public class DoubleFirstAggregationTest extends InitializedNullHandlingTest
   private DoubleFirstAggregatorFactory combiningAggFactory;
   private ColumnSelectorFactory colSelectorFactory;
   private TestLongColumnSelector timeSelector;
+  private TestLongColumnSelector customTimeSelector;
   private TestDoubleColumnSelectorImpl valueSelector;
   private TestObjectColumnSelector objectSelector;
 
   private double[] doubleValues = {1.1d, 2.7d, 3.5d, 1.3d};
   private long[] times = {12, 10, 5344, 7899999};
+  private long[] customTimes = {2, 1, 3, 4};
   private SerializablePair[] pairs = {
       new SerializablePair<>(1467225096L, 134.3d),
       new SerializablePair<>(23163L, 1232.212d),
@@ -60,13 +62,15 @@ public class DoubleFirstAggregationTest extends InitializedNullHandlingTest
   @Before
   public void setup()
   {
-    doubleFirstAggFactory = new DoubleFirstAggregatorFactory("billy", "nilly");
+    doubleFirstAggFactory = new DoubleFirstAggregatorFactory("billy", "nilly", null);
     combiningAggFactory = (DoubleFirstAggregatorFactory) doubleFirstAggFactory.getCombiningFactory();
     timeSelector = new TestLongColumnSelector(times);
+    customTimeSelector = new TestLongColumnSelector(customTimes);
     valueSelector = new TestDoubleColumnSelectorImpl(doubleValues);
     objectSelector = new TestObjectColumnSelector<>(pairs);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
     EasyMock.replay(colSelectorFactory);
@@ -91,6 +95,24 @@ public class DoubleFirstAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testDoubleFirstAggregatorWithTimeColumn()
+  {
+    Aggregator agg = new DoubleFirstAggregatorFactory("billy", "nilly", "customTime").factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, Double> result = (Pair<Long, Double>) agg.get();
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(doubleValues[1], result.rhs, 0.0001);
+    Assert.assertEquals((long) doubleValues[1], agg.getLong());
+    Assert.assertEquals(doubleValues[1], agg.getDouble(), 0.0001);
+  }
+
+  @Test
   public void testDoubleFirstBufferAggregator()
   {
     BufferAggregator agg = doubleFirstAggFactory.factorizeBuffered(
@@ -113,6 +135,27 @@ public class DoubleFirstAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testDoubleFirstBufferAggregatorWithTimeColumn()
+  {
+    BufferAggregator agg = new DoubleFirstAggregatorFactory("billy", "nilly", "customTime").factorizeBuffered(colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleFirstAggFactory.getMaxIntermediateSizeWithNulls()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0);
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(doubleValues[1], result.rhs, 0.0001);
+    Assert.assertEquals((long) doubleValues[1], agg.getLong(buffer, 0));
+    Assert.assertEquals(doubleValues[1], agg.getDouble(buffer, 0), 0.0001);
+  }
+
+  @Test
   public void testCombine()
   {
     SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
@@ -193,7 +236,9 @@ public class DoubleFirstAggregationTest extends InitializedNullHandlingTest
   {
     DefaultObjectMapper mapper = new DefaultObjectMapper();
     String doubleSpecJson = "{\"type\":\"doubleFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
-    Assert.assertEquals(doubleFirstAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class));
+    AggregatorFactory deserialized = mapper.readValue(doubleSpecJson, AggregatorFactory.class);
+    Assert.assertEquals(doubleFirstAggFactory, deserialized);
+    Assert.assertArrayEquals(doubleFirstAggFactory.getCacheKey(), deserialized.getCacheKey());
   }
 
   private void aggregate(
@@ -202,6 +247,7 @@ public class DoubleFirstAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate();
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
@@ -214,6 +260,7 @@ public class DoubleFirstAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate(buff, position);
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java
index b9b37f8..bd9bc11 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/FloatFirstAggregationTest.java
@@ -45,11 +45,13 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest
   private FloatFirstAggregatorFactory combiningAggFactory;
   private ColumnSelectorFactory colSelectorFactory;
   private TestLongColumnSelector timeSelector;
+  private TestLongColumnSelector customTimeSelector;
   private TestFloatColumnSelector valueSelector;
   private TestObjectColumnSelector objectSelector;
 
   private float[] floats = {1.1f, 2.7f, 3.5f, 1.3f};
   private long[] times = {12, 10, 5344, 7899999};
+  private long[] customTimes = {2, 1, 3, 4};
   private SerializablePair[] pairs = {
       new SerializablePair<>(1467225096L, 134.3f),
       new SerializablePair<>(23163L, 1232.212f),
@@ -60,20 +62,22 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest
   @Before
   public void setup()
   {
-    floatFirstAggregatorFactory = new FloatFirstAggregatorFactory("billy", "nilly");
+    floatFirstAggregatorFactory = new FloatFirstAggregatorFactory("billy", "nilly", null);
     combiningAggFactory = (FloatFirstAggregatorFactory) floatFirstAggregatorFactory.getCombiningFactory();
     timeSelector = new TestLongColumnSelector(times);
+    customTimeSelector = new TestLongColumnSelector(customTimes);
     valueSelector = new TestFloatColumnSelector(floats);
     objectSelector = new TestObjectColumnSelector<>(pairs);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector).atLeastOnce();
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector).atLeastOnce();
     EasyMock.replay(colSelectorFactory);
   }
 
   @Test
-  public void testDoubleFirstAggregator()
+  public void testFloatFirstAggregator()
   {
     Aggregator agg = floatFirstAggregatorFactory.factorize(colSelectorFactory);
 
@@ -91,7 +95,25 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
-  public void testDoubleFirstBufferAggregator()
+  public void testFloatFirstAggregatorWithTimeColumn()
+  {
+    Aggregator agg = new FloatFirstAggregatorFactory("billy", "nilly", "customTime").factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, Float> result = (Pair<Long, Float>) agg.get();
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(floats[1], result.rhs, 0.0001);
+    Assert.assertEquals((long) floats[1], agg.getLong());
+    Assert.assertEquals(floats[1], agg.getDouble(), 0.0001);
+  }
+
+  @Test
+  public void testFloatFirstBufferAggregator()
   {
     BufferAggregator agg = floatFirstAggregatorFactory.factorizeBuffered(
         colSelectorFactory);
@@ -113,6 +135,27 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testFloatFirstBufferAggregatorWithTimeColumn()
+  {
+    BufferAggregator agg = new FloatFirstAggregatorFactory("billy", "nilly", "customTime").factorizeBuffered(colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[floatFirstAggregatorFactory.getMaxIntermediateSizeWithNulls()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, Float> result = (Pair<Long, Float>) agg.get(buffer, 0);
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(floats[1], result.rhs, 0.0001);
+    Assert.assertEquals((long) floats[1], agg.getLong(buffer, 0));
+    Assert.assertEquals(floats[1], agg.getFloat(buffer, 0), 0.0001);
+  }
+
+  @Test
   public void testCombine()
   {
     SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621f);
@@ -133,7 +176,7 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
-  public void testDoubleFirstCombiningAggregator()
+  public void testFloatFirstCombiningAggregator()
   {
     Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
 
@@ -152,7 +195,7 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
-  public void testDoubleFirstCombiningBufferAggregator()
+  public void testFloatFirstCombiningBufferAggregator()
   {
     BufferAggregator agg = combiningAggFactory.factorizeBuffered(
         colSelectorFactory);
@@ -179,8 +222,10 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest
   public void testSerde() throws Exception
   {
     DefaultObjectMapper mapper = new DefaultObjectMapper();
-    String doubleSpecJson = "{\"type\":\"floatFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
-    Assert.assertEquals(floatFirstAggregatorFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class));
+    String floatSpecJson = "{\"type\":\"floatFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
+    AggregatorFactory deserialized = mapper.readValue(floatSpecJson, AggregatorFactory.class);
+    Assert.assertEquals(floatFirstAggregatorFactory, deserialized);
+    Assert.assertArrayEquals(floatFirstAggregatorFactory.getCacheKey(), deserialized.getCacheKey());
   }
 
   private void aggregate(
@@ -189,6 +234,7 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate();
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
@@ -201,6 +247,7 @@ public class FloatFirstAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate(buff, position);
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java
index 7fe9256..d73eb55 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/LongFirstAggregationTest.java
@@ -44,11 +44,13 @@ public class LongFirstAggregationTest extends InitializedNullHandlingTest
   private LongFirstAggregatorFactory combiningAggFactory;
   private ColumnSelectorFactory colSelectorFactory;
   private TestLongColumnSelector timeSelector;
+  private TestLongColumnSelector customTimeSelector;
   private TestLongColumnSelector valueSelector;
   private TestObjectColumnSelector objectSelector;
 
   private long[] longValues = {185, -216, -128751132, Long.MIN_VALUE};
   private long[] times = {1123126751, 1784247991, 1854329816, 1000000000};
+  private long[] customTimes = {2, 1, 3, 4};
   private SerializablePair[] pairs = {
       new SerializablePair<>(1L, 113267L),
       new SerializablePair<>(1L, 5437384L),
@@ -59,13 +61,15 @@ public class LongFirstAggregationTest extends InitializedNullHandlingTest
   @Before
   public void setup()
   {
-    longFirstAggFactory = new LongFirstAggregatorFactory("billy", "nilly");
+    longFirstAggFactory = new LongFirstAggregatorFactory("billy", "nilly", null);
     combiningAggFactory = (LongFirstAggregatorFactory) longFirstAggFactory.getCombiningFactory();
     timeSelector = new TestLongColumnSelector(times);
+    customTimeSelector = new TestLongColumnSelector(customTimes);
     valueSelector = new TestLongColumnSelector(longValues);
     objectSelector = new TestObjectColumnSelector<>(pairs);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
     EasyMock.replay(colSelectorFactory);
@@ -90,8 +94,47 @@ public class LongFirstAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testLongFirstAggregatorWithTimeColumn()
+  {
+    Aggregator agg = new LongFirstAggregatorFactory("billy", "nilly", "customTime").factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, Long> result = (Pair<Long, Long>) agg.get();
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(longValues[1], result.rhs.longValue());
+    Assert.assertEquals(longValues[1], agg.getLong());
+    Assert.assertEquals(longValues[1], agg.getFloat(), 0.0001);
+  }
+
+  @Test
   public void testLongFirstBufferAggregator()
   {
+    BufferAggregator agg = new LongFirstAggregatorFactory("billy", "nilly", "customTime").factorizeBuffered(colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[longFirstAggFactory.getMaxIntermediateSizeWithNulls()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(longValues[1], result.rhs.longValue());
+    Assert.assertEquals(longValues[1], agg.getLong(buffer, 0));
+    Assert.assertEquals(longValues[1], agg.getFloat(buffer, 0), 0.0001);
+  }
+
+  @Test
+  public void testLongFirstBufferAggregatorWithTimeColumn()
+  {
     BufferAggregator agg = longFirstAggFactory.factorizeBuffered(
         colSelectorFactory);
 
@@ -179,7 +222,9 @@ public class LongFirstAggregationTest extends InitializedNullHandlingTest
   {
     DefaultObjectMapper mapper = new DefaultObjectMapper();
     String longSpecJson = "{\"type\":\"longFirst\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
-    Assert.assertEquals(longFirstAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class));
+    AggregatorFactory deserialized = mapper.readValue(longSpecJson, AggregatorFactory.class);
+    Assert.assertEquals(longFirstAggFactory, deserialized);
+    Assert.assertArrayEquals(longFirstAggFactory.getCacheKey(), deserialized.getCacheKey());
   }
 
   private void aggregate(
@@ -188,6 +233,7 @@ public class LongFirstAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate();
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
@@ -200,6 +246,7 @@ public class LongFirstAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate(buff, position);
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
index 8cebe4d..2c8cfb8 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
@@ -47,11 +47,13 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest
   private AggregatorFactory combiningAggFactory;
   private ColumnSelectorFactory colSelectorFactory;
   private TestLongColumnSelector timeSelector;
+  private TestLongColumnSelector customTimeSelector;
   private TestObjectColumnSelector<String> valueSelector;
   private TestObjectColumnSelector objectSelector;
 
   private String[] strings = {"1111", "2222", "3333", null, "4444"};
   private long[] times = {8224, 6879, 2436, 3546, 7888};
+  private long[] customTimes = {2, 1, 3, 4, 5};
   private SerializablePairLongString[] pairs = {
       new SerializablePairLongString(52782L, "AAAA"),
       new SerializablePairLongString(65492L, "BBBB"),
@@ -64,13 +66,15 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest
   public void setup()
   {
     NullHandling.initializeForTests();
-    stringFirstAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
+    stringFirstAggFactory = new StringFirstAggregatorFactory("billy", "nilly", null, MAX_STRING_SIZE);
     combiningAggFactory = stringFirstAggFactory.getCombiningFactory();
     timeSelector = new TestLongColumnSelector(times);
+    customTimeSelector = new TestLongColumnSelector(customTimes);
     valueSelector = new TestObjectColumnSelector<>(strings);
     objectSelector = new TestObjectColumnSelector<>(pairs);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
     EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
@@ -95,10 +99,25 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testStringFirstAggregatorWithTimeColumn()
+  {
+    Aggregator agg = new StringFirstAggregatorFactory("billy", "nilly", "customTime", MAX_STRING_SIZE).factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get();
+
+    Assert.assertEquals(strings[1], result.rhs);
+  }
+
+  @Test
   public void testStringFirstBufferAggregator()
   {
-    BufferAggregator agg = stringFirstAggFactory.factorizeBuffered(
-        colSelectorFactory);
+    BufferAggregator agg = new StringFirstAggregatorFactory("billy", "nilly", "customTime", MAX_STRING_SIZE)
+        .factorizeBuffered(colSelectorFactory);
 
     ByteBuffer buffer = ByteBuffer.wrap(new byte[stringFirstAggFactory.getMaxIntermediateSize()]);
     agg.init(buffer, 0);
@@ -110,7 +129,7 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest
 
     Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
 
-    Assert.assertEquals(strings[2], result.rhs);
+    Assert.assertEquals(strings[1], result.rhs);
   }
 
   @Test
@@ -202,6 +221,7 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate();
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
@@ -214,6 +234,7 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate(buff, position);
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
index 368bad9..2653940 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
@@ -54,7 +54,7 @@ public class StringFirstBufferAggregatorTest
     TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
 
     StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
-        "billy", "billy", maxStringBytes
+        "billy", "billy", null, maxStringBytes
     );
 
     StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
@@ -91,7 +91,7 @@ public class StringFirstBufferAggregatorTest
     TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
 
     StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
-        "billy", "billy", maxStringBytes
+        "billy", "billy", null, maxStringBytes
     );
 
     StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
@@ -129,7 +129,7 @@ public class StringFirstBufferAggregatorTest
     TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
 
     StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
-        "billy", "billy", maxStringBytes
+        "billy", "billy", null, maxStringBytes
     );
 
     StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
@@ -168,7 +168,7 @@ public class StringFirstBufferAggregatorTest
     TestObjectColumnSelector<Double> objectColumnSelector = new TestObjectColumnSelector<>(doubles);
 
     StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
-        "billy", "billy", maxStringBytes
+        "billy", "billy", null, maxStringBytes
     );
 
     StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
index 3f83308..355a25c 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
@@ -75,7 +75,7 @@ public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
             new IncrementalIndexSchema.Builder()
                 .withQueryGranularity(Granularities.SECOND)
                 .withMetrics(new CountAggregatorFactory("cnt"))
-                .withMetrics(new StringFirstAggregatorFactory(FIRST_CLIENT_TYPE, CLIENT_TYPE, 1024))
+                .withMetrics(new StringFirstAggregatorFactory(FIRST_CLIENT_TYPE, CLIENT_TYPE, null, 1024))
                 .build()
         )
         .setMaxRowCount(1000)
@@ -118,10 +118,10 @@ public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
                                   .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
                                   .aggregators(
                                       ImmutableList.of(
-                                          new StringFirstAggregatorFactory("nonfolding", CLIENT_TYPE, 1024),
-                                          new StringFirstAggregatorFactory("folding", FIRST_CLIENT_TYPE, 1024),
-                                          new StringFirstAggregatorFactory("nonexistent", "nonexistent", 1024),
-                                          new StringFirstAggregatorFactory("numeric", "cnt", 1024)
+                                          new StringFirstAggregatorFactory("nonfolding", CLIENT_TYPE, null, 1024),
+                                          new StringFirstAggregatorFactory("folding", FIRST_CLIENT_TYPE, null, 1024),
+                                          new StringFirstAggregatorFactory("nonexistent", "nonexistent", null, 1024),
+                                          new StringFirstAggregatorFactory("numeric", "cnt", null, 1024)
                                       )
                                   )
                                   .build();
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java
index 847194f..3648ddc 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/DoubleLastAggregationTest.java
@@ -45,11 +45,13 @@ public class DoubleLastAggregationTest extends InitializedNullHandlingTest
   private DoubleLastAggregatorFactory combiningAggFactory;
   private ColumnSelectorFactory colSelectorFactory;
   private TestLongColumnSelector timeSelector;
+  private TestLongColumnSelector customTimeSelector;
   private TestDoubleColumnSelectorImpl valueSelector;
   private TestObjectColumnSelector objectSelector;
 
   private double[] doubles = {1.1897d, 0.001d, 86.23d, 166.228d};
   private long[] times = {8224, 6879, 2436, 7888};
+  private long[] customTimes = {1, 4, 3, 2};
   private SerializablePair[] pairs = {
       new SerializablePair<>(52782L, 134.3d),
       new SerializablePair<>(65492L, 1232.212d),
@@ -60,13 +62,15 @@ public class DoubleLastAggregationTest extends InitializedNullHandlingTest
   @Before
   public void setup()
   {
-    doubleLastAggFactory = new DoubleLastAggregatorFactory("billy", "nilly");
+    doubleLastAggFactory = new DoubleLastAggregatorFactory("billy", "nilly", null);
     combiningAggFactory = (DoubleLastAggregatorFactory) doubleLastAggFactory.getCombiningFactory();
     timeSelector = new TestLongColumnSelector(times);
+    customTimeSelector = new TestLongColumnSelector(customTimes);
     valueSelector = new TestDoubleColumnSelectorImpl(doubles);
     objectSelector = new TestObjectColumnSelector<>(pairs);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
     EasyMock.replay(colSelectorFactory);
@@ -91,6 +95,24 @@ public class DoubleLastAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testDoubleLastAggregatorWithTimeColumn()
+  {
+    Aggregator agg = new DoubleLastAggregatorFactory("billy", "nilly", "customTime").factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, Double> result = (Pair<Long, Double>) agg.get();
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(doubles[1], result.rhs, 0.0001);
+    Assert.assertEquals((long) doubles[1], agg.getLong());
+    Assert.assertEquals(doubles[1], agg.getDouble(), 0.0001);
+  }
+
+  @Test
   public void testDoubleLastBufferAggregator()
   {
     BufferAggregator agg = doubleLastAggFactory.factorizeBuffered(
@@ -113,6 +135,28 @@ public class DoubleLastAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testDoubleLastBufferAggregatorWithTimeColumn()
+  {
+    BufferAggregator agg = new DoubleLastAggregatorFactory("billy", "nilly", "customTime").factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[doubleLastAggFactory.getMaxIntermediateSizeWithNulls()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, Double> result = (Pair<Long, Double>) agg.get(buffer, 0);
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(doubles[1], result.rhs, 0.0001);
+    Assert.assertEquals((long) doubles[1], agg.getLong(buffer, 0));
+    Assert.assertEquals(doubles[1], agg.getDouble(buffer, 0), 0.0001);
+  }
+
+  @Test
   public void testCombine()
   {
     SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621);
@@ -180,7 +224,9 @@ public class DoubleLastAggregationTest extends InitializedNullHandlingTest
   {
     DefaultObjectMapper mapper = new DefaultObjectMapper();
     String doubleSpecJson = "{\"type\":\"doubleLast\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
-    Assert.assertEquals(doubleLastAggFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class));
+    AggregatorFactory deserialized = mapper.readValue(doubleSpecJson, AggregatorFactory.class);
+    Assert.assertEquals(doubleLastAggFactory, deserialized);
+    Assert.assertArrayEquals(doubleLastAggFactory.getCacheKey(), deserialized.getCacheKey());
   }
 
   private void aggregate(
@@ -189,6 +235,7 @@ public class DoubleLastAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate();
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
@@ -201,6 +248,7 @@ public class DoubleLastAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate(buff, position);
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java
index 86b6a99..c1e148a 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/FloatLastAggregationTest.java
@@ -45,11 +45,13 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest
   private FloatLastAggregatorFactory combiningAggFactory;
   private ColumnSelectorFactory colSelectorFactory;
   private TestLongColumnSelector timeSelector;
+  private TestLongColumnSelector customTimeSelector;
   private TestFloatColumnSelector valueSelector;
   private TestObjectColumnSelector objectSelector;
 
   private float[] floats = {1.1897f, 0.001f, 86.23f, 166.228f};
   private long[] times = {8224, 6879, 2436, 7888};
+  private long[] customTimes = {1, 4, 3, 2};
   private SerializablePair[] pairs = {
       new SerializablePair<>(52782L, 134.3f),
       new SerializablePair<>(65492L, 1232.212f),
@@ -60,20 +62,22 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest
   @Before
   public void setup()
   {
-    floatLastAggregatorFactory = new FloatLastAggregatorFactory("billy", "nilly");
+    floatLastAggregatorFactory = new FloatLastAggregatorFactory("billy", "nilly", null);
     combiningAggFactory = (FloatLastAggregatorFactory) floatLastAggregatorFactory.getCombiningFactory();
     timeSelector = new TestLongColumnSelector(times);
+    customTimeSelector = new TestLongColumnSelector(customTimes);
     valueSelector = new TestFloatColumnSelector(floats);
     objectSelector = new TestObjectColumnSelector<>(pairs);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
     EasyMock.replay(colSelectorFactory);
   }
 
   @Test
-  public void testDoubleLastAggregator()
+  public void testFloatLastAggregator()
   {
     Aggregator agg = floatLastAggregatorFactory.factorize(colSelectorFactory);
 
@@ -91,7 +95,25 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
-  public void testDoubleLastBufferAggregator()
+  public void testFloatLastAggregatorWithTimeColumn()
+  {
+    Aggregator agg = new FloatLastAggregatorFactory("billy", "nilly", "customTime").factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, Float> result = (Pair<Long, Float>) agg.get();
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(floats[1], result.rhs, 0.0001);
+    Assert.assertEquals((long) floats[1], agg.getLong());
+    Assert.assertEquals(floats[1], agg.getFloat(), 0.0001);
+  }
+
+  @Test
+  public void testFloatLastBufferAggregator()
   {
     BufferAggregator agg = floatLastAggregatorFactory.factorizeBuffered(
         colSelectorFactory);
@@ -113,6 +135,28 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testFloatLastBufferAggregatorWithTimeColumn()
+  {
+    BufferAggregator agg = new FloatLastAggregatorFactory("billy", "nilly", "customTime").factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[floatLastAggregatorFactory.getMaxIntermediateSizeWithNulls()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, Float> result = (Pair<Long, Float>) agg.get(buffer, 0);
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(floats[1], result.rhs, 0.0001);
+    Assert.assertEquals((long) floats[1], agg.getLong(buffer, 0));
+    Assert.assertEquals(floats[1], agg.getFloat(buffer, 0), 0.0001);
+  }
+
+  @Test
   public void testCombine()
   {
     SerializablePair pair1 = new SerializablePair<>(1467225000L, 3.621f);
@@ -133,7 +177,7 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
-  public void testDoubleLastCombiningAggregator()
+  public void testFloatLastCombiningAggregator()
   {
     Aggregator agg = combiningAggFactory.factorize(colSelectorFactory);
 
@@ -152,7 +196,7 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
-  public void testDoubleLastCombiningBufferAggregator()
+  public void testFloatLastCombiningBufferAggregator()
   {
     BufferAggregator agg = combiningAggFactory.factorizeBuffered(
         colSelectorFactory);
@@ -179,8 +223,10 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest
   public void testSerde() throws Exception
   {
     DefaultObjectMapper mapper = new DefaultObjectMapper();
-    String doubleSpecJson = "{\"type\":\"floatLast\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
-    Assert.assertEquals(floatLastAggregatorFactory, mapper.readValue(doubleSpecJson, AggregatorFactory.class));
+    String floatSpecJson = "{\"type\":\"floatLast\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
+    AggregatorFactory deserialized = mapper.readValue(floatSpecJson, AggregatorFactory.class);
+    Assert.assertEquals(floatLastAggregatorFactory, deserialized);
+    Assert.assertArrayEquals(floatLastAggregatorFactory.getCacheKey(), deserialized.getCacheKey());
   }
 
   private void aggregate(
@@ -189,6 +235,7 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate();
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
@@ -201,6 +248,7 @@ public class FloatLastAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate(buff, position);
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java
index a9b2fad..cacd905 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/LongLastAggregationTest.java
@@ -44,11 +44,13 @@ public class LongLastAggregationTest extends InitializedNullHandlingTest
   private LongLastAggregatorFactory combiningAggFactory;
   private ColumnSelectorFactory colSelectorFactory;
   private TestLongColumnSelector timeSelector;
+  private TestLongColumnSelector customTimeSelector;
   private TestLongColumnSelector valueSelector;
   private TestObjectColumnSelector objectSelector;
 
   private long[] longValues = {23216, 8635, 1547123, Long.MAX_VALUE};
   private long[] times = {1467935723, 1467225653, 1601848932, 72515};
+  private long[] customTimes = {1, 4, 3, 2};
   private SerializablePair[] pairs = {
       new SerializablePair<>(12531L, 113267L),
       new SerializablePair<>(123L, 5437384L),
@@ -59,13 +61,15 @@ public class LongLastAggregationTest extends InitializedNullHandlingTest
   @Before
   public void setup()
   {
-    longLastAggFactory = new LongLastAggregatorFactory("billy", "nilly");
+    longLastAggFactory = new LongLastAggregatorFactory("billy", "nilly", null);
     combiningAggFactory = (LongLastAggregatorFactory) longLastAggFactory.getCombiningFactory();
     timeSelector = new TestLongColumnSelector(times);
+    customTimeSelector = new TestLongColumnSelector(customTimes);
     valueSelector = new TestLongColumnSelector(longValues);
     objectSelector = new TestObjectColumnSelector<>(pairs);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
     EasyMock.replay(colSelectorFactory);
@@ -90,6 +94,24 @@ public class LongLastAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testLongLastAggregatorWithTimeColumn()
+  {
+    Aggregator agg = new LongLastAggregatorFactory("billy", "nilly", "customTime").factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, Long> result = (Pair<Long, Long>) agg.get();
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(longValues[1], result.rhs.longValue());
+    Assert.assertEquals(longValues[1], agg.getLong());
+    Assert.assertEquals(longValues[1], agg.getFloat(), 1);
+  }
+
+  @Test
   public void testLongLastBufferAggregator()
   {
     BufferAggregator agg = longLastAggFactory.factorizeBuffered(
@@ -112,6 +134,28 @@ public class LongLastAggregationTest extends InitializedNullHandlingTest
   }
 
   @Test
+  public void testLongLastBufferAggregatorWithTimeColumn()
+  {
+    BufferAggregator agg = new LongLastAggregatorFactory("billy", "nilly", "customTime").factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[longLastAggFactory.getMaxIntermediateSizeWithNulls()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, Long> result = (Pair<Long, Long>) agg.get(buffer, 0);
+
+    Assert.assertEquals(customTimes[1], result.lhs.longValue());
+    Assert.assertEquals(longValues[1], result.rhs.longValue());
+    Assert.assertEquals(longValues[1], agg.getLong(buffer, 0));
+    Assert.assertEquals(longValues[1], agg.getFloat(buffer, 0), 1);
+  }
+
+  @Test
   public void testCombine()
   {
     SerializablePair pair1 = new SerializablePair<>(1467225000L, 64432L);
@@ -179,7 +223,9 @@ public class LongLastAggregationTest extends InitializedNullHandlingTest
   {
     DefaultObjectMapper mapper = new DefaultObjectMapper();
     String longSpecJson = "{\"type\":\"longLast\",\"name\":\"billy\",\"fieldName\":\"nilly\"}";
-    Assert.assertEquals(longLastAggFactory, mapper.readValue(longSpecJson, AggregatorFactory.class));
+    AggregatorFactory deserialized = mapper.readValue(longSpecJson, AggregatorFactory.class);
+    Assert.assertEquals(longLastAggFactory, deserialized);
+    Assert.assertArrayEquals(longLastAggFactory.getCacheKey(), deserialized.getCacheKey());
   }
 
   private void aggregate(
@@ -188,6 +234,7 @@ public class LongLastAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate();
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
@@ -200,6 +247,7 @@ public class LongLastAggregationTest extends InitializedNullHandlingTest
   {
     agg.aggregate(buff, position);
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
index a08ff4c..da402b5 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
@@ -46,11 +46,13 @@ public class StringLastAggregationTest
   private AggregatorFactory combiningAggFactory;
   private ColumnSelectorFactory colSelectorFactory;
   private TestLongColumnSelector timeSelector;
+  private TestLongColumnSelector customTimeSelector;
   private TestObjectColumnSelector<String> valueSelector;
   private TestObjectColumnSelector objectSelector;
 
   private String[] strings = {"1111", "2222", "3333", null, "4444"};
   private long[] times = {8224, 6879, 2436, 3546, 7888};
+  private long[] customTimes = {1, 5, 4, 2, 3};
   private SerializablePairLongString[] pairs = {
       new SerializablePairLongString(52782L, "AAAA"),
       new SerializablePairLongString(65492L, "BBBB"),
@@ -63,13 +65,15 @@ public class StringLastAggregationTest
   public void setup()
   {
     NullHandling.initializeForTests();
-    stringLastAggFactory = new StringLastAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
+    stringLastAggFactory = new StringLastAggregatorFactory("billy", "nilly", null, MAX_STRING_SIZE);
     combiningAggFactory = stringLastAggFactory.getCombiningFactory();
     timeSelector = new TestLongColumnSelector(times);
+    customTimeSelector = new TestLongColumnSelector(customTimes);
     valueSelector = new TestObjectColumnSelector<>(strings);
     objectSelector = new TestObjectColumnSelector<>(pairs);
     colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("customTime")).andReturn(customTimeSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
     EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
     EasyMock.expect(colSelectorFactory.getColumnCapabilities("nilly"))
@@ -94,6 +98,21 @@ public class StringLastAggregationTest
   }
 
   @Test
+  public void testStringLastAggregatorWithTimeColumn()
+  {
+    Aggregator agg = new StringLastAggregatorFactory("billy", "nilly", "customTime", MAX_STRING_SIZE).factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get();
+
+    Assert.assertEquals(strings[1], result.rhs);
+  }
+
+  @Test
   public void testStringLastBufferAggregator()
   {
     BufferAggregator agg = stringLastAggFactory.factorizeBuffered(
@@ -113,6 +132,25 @@ public class StringLastAggregationTest
   }
 
   @Test
+  public void testStringLastBufferAggregatorWithTimeColumn()
+  {
+    BufferAggregator agg = new StringLastAggregatorFactory("billy", "nilly", "customTime", MAX_STRING_SIZE).factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+
+    Assert.assertEquals(strings[1], result.rhs);
+  }
+
+  @Test
   public void testCombine()
   {
     SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA");
@@ -185,6 +223,7 @@ public class StringLastAggregationTest
   {
     agg.aggregate();
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
@@ -197,6 +236,7 @@ public class StringLastAggregationTest
   {
     agg.aggregate(buff, position);
     timeSelector.increment();
+    customTimeSelector.increment();
     valueSelector.increment();
     objectSelector.increment();
   }
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
index d4d17a3..37a1c41 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
@@ -55,7 +55,7 @@ public class StringLastBufferAggregatorTest
     TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
 
     StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
-        "billy", "billy", maxStringBytes
+        "billy", "billy", null, maxStringBytes
     );
 
     StringLastBufferAggregator agg = new StringLastBufferAggregator(
@@ -93,7 +93,7 @@ public class StringLastBufferAggregatorTest
     TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
 
     StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
-        "billy", "billy", maxStringBytes
+        "billy", "billy", null, maxStringBytes
     );
 
     StringLastBufferAggregator agg = new StringLastBufferAggregator(
@@ -131,7 +131,7 @@ public class StringLastBufferAggregatorTest
     TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
 
     StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
-        "billy", "billy", maxStringBytes
+        "billy", "billy", null, maxStringBytes
     );
 
     StringLastBufferAggregator agg = new StringLastBufferAggregator(
@@ -170,7 +170,7 @@ public class StringLastBufferAggregatorTest
     TestObjectColumnSelector<Double> objectColumnSelector = new TestObjectColumnSelector<>(doubles);
 
     StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
-        "billy", "billy", maxStringBytes
+        "billy", "billy", null, maxStringBytes
     );
 
     StringLastBufferAggregator agg = new StringLastBufferAggregator(
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
index ae79cb5..235dfdf 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
@@ -74,7 +74,7 @@ public class StringLastTimeseriesQueryTest
             new IncrementalIndexSchema.Builder()
                 .withQueryGranularity(Granularities.SECOND)
                 .withMetrics(new CountAggregatorFactory("cnt"))
-                .withMetrics(new StringLastAggregatorFactory(LAST_CLIENT_TYPE, CLIENT_TYPE, 1024))
+                .withMetrics(new StringLastAggregatorFactory(LAST_CLIENT_TYPE, CLIENT_TYPE, null, 1024))
                 .build()
         )
         .setMaxRowCount(1000)
@@ -117,10 +117,10 @@ public class StringLastTimeseriesQueryTest
                                   .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
                                   .aggregators(
                                       ImmutableList.of(
-                                          new StringLastAggregatorFactory("nonfolding", CLIENT_TYPE, 1024),
-                                          new StringLastAggregatorFactory("folding", LAST_CLIENT_TYPE, 1024),
-                                          new StringLastAggregatorFactory("nonexistent", "nonexistent", 1024),
-                                          new StringLastAggregatorFactory("numeric", "cnt", 1024)
+                                          new StringLastAggregatorFactory("nonfolding", CLIENT_TYPE, null, 1024),
+                                          new StringLastAggregatorFactory("folding", LAST_CLIENT_TYPE, null, 1024),
+                                          new StringLastAggregatorFactory("nonexistent", "nonexistent", null, 1024),
+                                          new StringLastAggregatorFactory("numeric", "cnt", null, 1024)
                                       )
                                   )
                                   .build();
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java
index 637d6c1..ceea41d 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/post/FinalizingFieldAccessPostAggregatorTest.java
@@ -287,7 +287,7 @@ public class FinalizingFieldAccessPostAggregatorTest extends InitializedNullHand
               .granularity(Granularities.HOUR)
               .aggregators(
                   new CountAggregatorFactory("count"),
-                  new StringFirstAggregatorFactory("stringo", "col", 1024)
+                  new StringFirstAggregatorFactory("stringo", "col", null, 1024)
               )
               .postAggregators(
                   new FieldAccessPostAggregator("a", "stringo"),
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
index 36b0c5d..b444218 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChestTest.java
@@ -837,13 +837,13 @@ public class GroupByQueryQueryToolChestTest extends InitializedNullHandlingTest
   {
     switch (valueType) {
       case LONG:
-        return new LongLastAggregatorFactory("complexMetric", "test");
+        return new LongLastAggregatorFactory("complexMetric", "test", null);
       case DOUBLE:
-        return new DoubleLastAggregatorFactory("complexMetric", "test");
+        return new DoubleLastAggregatorFactory("complexMetric", "test", null);
       case FLOAT:
-        return new FloatLastAggregatorFactory("complexMetric", "test");
+        return new FloatLastAggregatorFactory("complexMetric", "test", null);
       case STRING:
-        return new StringLastAggregatorFactory("complexMetric", "test", null);
+        return new StringLastAggregatorFactory("complexMetric", "test", null, null);
       default:
         throw new IllegalArgumentException("bad valueType: " + valueType);
     }
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index f8b1793..ccfc87b 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -2598,8 +2598,8 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
         .setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
         .setDimensions(new DefaultDimensionSpec("market", "market"))
         .setAggregatorSpecs(
-            new LongFirstAggregatorFactory("first", "index"),
-            new LongLastAggregatorFactory("last", "index")
+            new LongFirstAggregatorFactory("first", "index", null),
+            new LongLastAggregatorFactory("last", "index", null)
         )
         .setGranularity(QueryRunnerTestHelper.MONTH_GRAN)
         .build();
@@ -2691,8 +2691,8 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
             QueryRunnerTestHelper.ROWS_COUNT,
             QueryRunnerTestHelper.INDEX_LONG_SUM,
             QueryRunnerTestHelper.QUALITY_CARDINALITY,
-            new LongFirstAggregatorFactory("first", "index"),
-            new LongLastAggregatorFactory("last", "index")
+            new LongFirstAggregatorFactory("first", "index", null),
+            new LongLastAggregatorFactory("last", "index", null)
         )
         .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
         .build();
@@ -6361,8 +6361,8 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
         .setDimensions(new DefaultDimensionSpec("market", "market"))
         .setAggregatorSpecs(
             QueryRunnerTestHelper.ROWS_COUNT,
-            new LongFirstAggregatorFactory("innerfirst", "index"),
-            new LongLastAggregatorFactory("innerlast", "index")
+            new LongFirstAggregatorFactory("innerfirst", "index", null),
+            new LongLastAggregatorFactory("innerlast", "index", null)
         )
         .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
         .overrideContext(ImmutableMap.of("finalize", true))
@@ -6373,8 +6373,8 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
         .setQuerySegmentSpec(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
         .setDimensions(Collections.emptyList())
         .setAggregatorSpecs(
-            new LongFirstAggregatorFactory("first", "innerfirst"),
-            new LongLastAggregatorFactory("last", "innerlast")
+            new LongFirstAggregatorFactory("first", "innerfirst", null),
+            new LongLastAggregatorFactory("last", "innerlast", null)
         )
         .setGranularity(QueryRunnerTestHelper.MONTH_GRAN)
         .build();
diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
index 2a877fb..b672798 100644
--- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java
@@ -93,7 +93,7 @@ public class TimeseriesQueryQueryToolChestTest
                 ImmutableList.of(
                     new CountAggregatorFactory("metric1"),
                     new LongSumAggregatorFactory("metric0", "metric0"),
-                    new StringLastAggregatorFactory("complexMetric", "test", null)
+                    new StringLastAggregatorFactory("complexMetric", "test", null, null)
                 ),
                 ImmutableList.of(new ConstantPostAggregator("post", 10)),
                 0,
diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
index 2a8408c..1b86994 100644
--- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerTest.java
@@ -178,7 +178,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                                       Arrays.asList(
                                           QueryRunnerTestHelper.ROWS_COUNT,
                                           QueryRunnerTestHelper.INDEX_DOUBLE_SUM,
-                                          new DoubleFirstAggregatorFactory("first", "index")
+                                          new DoubleFirstAggregatorFactory("first", "index", null)
 
                                       )
                                   )
@@ -1957,8 +1957,8 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
                                   .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
                                   .aggregators(
                                       ImmutableList.of(
-                                          new DoubleFirstAggregatorFactory("first", "index"),
-                                          new DoubleLastAggregatorFactory("last", "index")
+                                          new DoubleFirstAggregatorFactory("first", "index", null),
+                                          new DoubleLastAggregatorFactory("last", "index", null)
                                       )
                                   )
                                   .descending(descending)
diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java
index 790e426..f5b5111 100644
--- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java
+++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryQueryToolChestTest.java
@@ -372,13 +372,13 @@ public class TopNQueryQueryToolChestTest extends InitializedNullHandlingTest
   {
     switch (valueType) {
       case LONG:
-        return new LongLastAggregatorFactory("complexMetric", "test");
+        return new LongLastAggregatorFactory("complexMetric", "test", null);
       case DOUBLE:
-        return new DoubleLastAggregatorFactory("complexMetric", "test");
+        return new DoubleLastAggregatorFactory("complexMetric", "test", null);
       case FLOAT:
-        return new FloatLastAggregatorFactory("complexMetric", "test");
+        return new FloatLastAggregatorFactory("complexMetric", "test", null);
       case STRING:
-        return new StringLastAggregatorFactory("complexMetric", "test", null);
+        return new StringLastAggregatorFactory("complexMetric", "test", null, null);
       default:
         throw new IllegalArgumentException("bad valueType: " + valueType);
     }
diff --git a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
index 341fb17..23379db 100644
--- a/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerTest.java
@@ -290,7 +290,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
                     Lists.newArrayList(
                         new DoubleMaxAggregatorFactory("maxIndex", "index"),
                         new DoubleMinAggregatorFactory("minIndex", "index"),
-                        new DoubleFirstAggregatorFactory("first", "index")
+                        new DoubleFirstAggregatorFactory("first", "index", null)
                     )
                 )
             )
@@ -826,8 +826,8 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
         .threshold(3)
         .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
         .aggregators(
-            new LongFirstAggregatorFactory("first", "index"),
-            new LongLastAggregatorFactory("last", "index")
+            new LongFirstAggregatorFactory("first", "index", null),
+            new LongLastAggregatorFactory("last", "index", null)
         )
         .build();
 
@@ -935,8 +935,8 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
         .threshold(3)
         .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
         .aggregators(
-            new FloatFirstAggregatorFactory("first", "index"),
-            new FloatLastAggregatorFactory("last", "index")
+            new FloatFirstAggregatorFactory("first", "index", null),
+            new FloatLastAggregatorFactory("last", "index", null)
         )
         .build();
 
@@ -1044,8 +1044,8 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
         .threshold(3)
         .intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
         .aggregators(
-            new FloatFirstAggregatorFactory("first", "indexFloat"),
-            new FloatLastAggregatorFactory("last", "indexFloat")
+            new FloatFirstAggregatorFactory("first", "indexFloat", null),
+            new FloatLastAggregatorFactory("last", "indexFloat", null)
         )
         .build();
 
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java
index 3cd0e86..82d6a7f 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java
@@ -105,7 +105,7 @@ public class IndexMergerRollupTest extends InitializedNullHandlingTest
   public void testStringFirstRollup() throws Exception
   {
     AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
-        new StringFirstAggregatorFactory("m", "m", 1024)
+        new StringFirstAggregatorFactory("m", "m", null, 1024)
     };
     testStringFirstLastRollup(aggregatorFactories);
   }
@@ -114,7 +114,7 @@ public class IndexMergerRollupTest extends InitializedNullHandlingTest
   public void testStringLastRollup() throws Exception
   {
     AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
-        new StringLastAggregatorFactory("m", "m", 1024)
+        new StringLastAggregatorFactory("m", "m", null, 1024)
     };
     testStringFirstLastRollup(aggregatorFactories);
   }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
index 7bf9946..560708f 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
@@ -35,6 +35,7 @@ import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.util.Optionality;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.any.DoubleAnyAggregatorFactory;
@@ -76,18 +77,18 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
   {
     EARLIEST {
       @Override
-      AggregatorFactory createAggregatorFactory(String name, String fieldName, ColumnType type, int maxStringBytes)
+      AggregatorFactory createAggregatorFactory(String name, String fieldName, String timeColumn, ColumnType type, int maxStringBytes)
       {
         switch (type.getType()) {
           case LONG:
-            return new LongFirstAggregatorFactory(name, fieldName);
+            return new LongFirstAggregatorFactory(name, fieldName, timeColumn);
           case FLOAT:
-            return new FloatFirstAggregatorFactory(name, fieldName);
+            return new FloatFirstAggregatorFactory(name, fieldName, timeColumn);
           case DOUBLE:
-            return new DoubleFirstAggregatorFactory(name, fieldName);
+            return new DoubleFirstAggregatorFactory(name, fieldName, timeColumn);
           case STRING:
           case COMPLEX:
-            return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes);
+            return new StringFirstAggregatorFactory(name, fieldName, timeColumn, maxStringBytes);
           default:
             throw new ISE("Cannot build EARLIEST aggregatorFactory for type[%s]", type);
         }
@@ -96,18 +97,18 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
 
     LATEST {
       @Override
-      AggregatorFactory createAggregatorFactory(String name, String fieldName, ColumnType type, int maxStringBytes)
+      AggregatorFactory createAggregatorFactory(String name, String fieldName, String timeColumn, ColumnType type, int maxStringBytes)
       {
         switch (type.getType()) {
           case LONG:
-            return new LongLastAggregatorFactory(name, fieldName);
+            return new LongLastAggregatorFactory(name, fieldName, timeColumn);
           case FLOAT:
-            return new FloatLastAggregatorFactory(name, fieldName);
+            return new FloatLastAggregatorFactory(name, fieldName, timeColumn);
           case DOUBLE:
-            return new DoubleLastAggregatorFactory(name, fieldName);
+            return new DoubleLastAggregatorFactory(name, fieldName, timeColumn);
           case STRING:
           case COMPLEX:
-            return new StringLastAggregatorFactory(name, fieldName, maxStringBytes);
+            return new StringLastAggregatorFactory(name, fieldName, timeColumn, maxStringBytes);
           default:
             throw new ISE("Cannot build LATEST aggregatorFactory for type[%s]", type);
         }
@@ -116,7 +117,7 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
 
     ANY_VALUE {
       @Override
-      AggregatorFactory createAggregatorFactory(String name, String fieldName, ColumnType type, int maxStringBytes)
+      AggregatorFactory createAggregatorFactory(String name, String fieldName, String timeColumn, ColumnType type, int maxStringBytes)
       {
         switch (type.getType()) {
           case LONG:
@@ -136,6 +137,7 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
     abstract AggregatorFactory createAggregatorFactory(
         String name,
         String fieldName,
+        String timeColumn,
         ColumnType outputType,
         int maxStringBytes
     );
@@ -183,20 +185,6 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
     }
 
     final String aggregatorName = finalizeAggregations ? Calcites.makePrefixedName(name, "a") : name;
-    final String fieldName;
-
-    if (args.get(0).isDirectColumnAccess()) {
-      fieldName = args.get(0).getDirectColumn();
-    } else {
-      final RelDataType dataType = rexNodes.get(0).getType();
-      final VirtualColumn virtualColumn =
-          virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, args.get(0), dataType);
-      fieldName = virtualColumn.getOutputName();
-    }
-
-    // Second arg must be a literal, if it exists (the type signature below requires it).
-    final int maxBytes = rexNodes.size() > 1 ? RexLiteral.intValue(rexNodes.get(1)) : -1;
-
     final ColumnType outputType = Calcites.getColumnTypeForRelDataType(aggregateCall.getType());
     if (outputType == null) {
       throw new ISE(
@@ -206,19 +194,74 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
       );
     }
 
+    final String fieldName = getColumnName(plannerContext, virtualColumnRegistry, args.get(0), rexNodes.get(0));
+
+    final AggregatorFactory theAggFactory;
+    switch (args.size()) {
+      case 1:
+        theAggFactory = aggregatorType.createAggregatorFactory(aggregatorName, fieldName, null, outputType, -1);
+        break;
+      case 2:
+        if (!outputType.isNumeric()) { // translates (expr, maxBytesPerString) signature
+          theAggFactory = aggregatorType.createAggregatorFactory(
+              aggregatorName,
+              fieldName,
+              null,
+              outputType,
+              RexLiteral.intValue(rexNodes.get(1))
+          );
+        } else { // translates (expr, timeColumn) signature
+          theAggFactory = aggregatorType.createAggregatorFactory(
+              aggregatorName,
+              fieldName,
+              getColumnName(plannerContext, virtualColumnRegistry, args.get(1), rexNodes.get(1)),
+              outputType,
+              -1
+          );
+        }
+        break;
+      case 3:
+        theAggFactory = aggregatorType.createAggregatorFactory(
+            aggregatorName,
+            fieldName,
+            getColumnName(plannerContext, virtualColumnRegistry, args.get(2), rexNodes.get(2)),
+            outputType,
+            RexLiteral.intValue(rexNodes.get(1))
+        );
+        break;
+      default:
+        throw new IAE(
+            "aggregation[%s], Invalid number of arguments[%,d] to Earliest/Latest/Any operator",
+            aggregatorName,
+            args.size()
+        );
+    }
+
     return Aggregation.create(
-        Collections.singletonList(
-            aggregatorType.createAggregatorFactory(
-                aggregatorName,
-                fieldName,
-                outputType,
-                maxBytes
-            )
-        ),
+        Collections.singletonList(theAggFactory),
         finalizeAggregations ? new FinalizingFieldAccessPostAggregator(name, aggregatorName) : null
     );
   }
 
+  private String getColumnName(
+      PlannerContext plannerContext,
+      VirtualColumnRegistry virtualColumnRegistry,
+      DruidExpression arg,
+      RexNode rexNode
+  )
+  {
+    String columnName;
+    if (arg.isDirectColumnAccess()) {
+      columnName = arg.getDirectColumn();
+    } else {
+      final RelDataType dataType = rexNode.getType();
+      final VirtualColumn virtualColumn =
+          virtualColumnRegistry.getOrCreateVirtualColumnForExpression(plannerContext, arg, dataType);
+      columnName = virtualColumn.getOutputName();
+    }
+    return columnName;
+  }
+
   static class EarliestLatestReturnTypeInference implements SqlReturnTypeInference
   {
     private final int ordinal;
@@ -262,6 +305,17 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
                   "'" + aggregatorType.name() + "(expr, maxBytesPerString)'\n",
                   OperandTypes.ANY,
                   OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
+              ),
+              OperandTypes.sequence(
+                  "'" + aggregatorType.name() + "(expr, timeColumn)'\n",
+                  OperandTypes.ANY,
+                  OperandTypes.NUMERIC
+              ),
+              OperandTypes.sequence(
+                  "'" + aggregatorType.name() + "(expr, maxBytesPerString, timeColumn)'\n",
+                  OperandTypes.ANY,
+                  OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL),
+                  OperandTypes.NUMERIC
               )
           ),
           SqlFunctionCategory.STRING,
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
index 2f143e1..dd2ac39 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
@@ -1370,7 +1370,7 @@ public class CalciteJoinQueryTest extends BaseCalciteQueryTest
                                         new SubstringDimExtractionFn(0, 1)
                                     )
                                 )
-                                .setAggregatorSpecs(new StringLastAggregatorFactory("a0", "v", 10))
+                                .setAggregatorSpecs(new StringLastAggregatorFactory("a0", "v", null, 10))
                                 .build()
                         ),
                         "j0.",
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 58a1200..2feda99 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -608,7 +608,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
     testQuery(
         "SELECT "
         + "EARLIEST(cnt), EARLIEST(m1), EARLIEST(dim1, 10), "
-        + "EARLIEST(cnt + 1), EARLIEST(m1 + 1), EARLIEST(dim1 || CAST(cnt AS VARCHAR), 10) "
+        + "EARLIEST(cnt + 1), EARLIEST(m1 + 1), EARLIEST(dim1 || CAST(cnt AS VARCHAR), 10), "
+        + "EARLIEST(cnt, m1), EARLIEST(m1, m1), EARLIEST(dim1, 10, m1), "
+        + "EARLIEST(cnt + 1, m1), EARLIEST(m1 + 1, m1), EARLIEST(dim1 || CAST(cnt AS VARCHAR), 10, m1) "
         + "FROM druid.foo",
         ImmutableList.of(
             Druids.newTimeseriesQueryBuilder()
@@ -622,19 +624,25 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                   )
                   .aggregators(
                       aggregators(
-                          new LongFirstAggregatorFactory("a0", "cnt"),
-                          new FloatFirstAggregatorFactory("a1", "m1"),
-                          new StringFirstAggregatorFactory("a2", "dim1", 10),
-                          new LongFirstAggregatorFactory("a3", "v0"),
-                          new FloatFirstAggregatorFactory("a4", "v1"),
-                          new StringFirstAggregatorFactory("a5", "v2", 10)
+                          new LongFirstAggregatorFactory("a0", "cnt", null),
+                          new FloatFirstAggregatorFactory("a1", "m1", null),
+                          new StringFirstAggregatorFactory("a2", "dim1", null, 10),
+                          new LongFirstAggregatorFactory("a3", "v0", null),
+                          new FloatFirstAggregatorFactory("a4", "v1", null),
+                          new StringFirstAggregatorFactory("a5", "v2", null, 10),
+                          new LongFirstAggregatorFactory("a6", "cnt", "m1"),
+                          new FloatFirstAggregatorFactory("a7", "m1", "m1"),
+                          new StringFirstAggregatorFactory("a8", "dim1", "m1", 10),
+                          new LongFirstAggregatorFactory("a9", "v0", "m1"),
+                          new FloatFirstAggregatorFactory("a10", "v1", "m1"),
+                          new StringFirstAggregatorFactory("a11", "v2", "m1", 10)
                       )
                   )
                   .context(QUERY_CONTEXT_DEFAULT)
                   .build()
         ),
         ImmutableList.of(
-            new Object[]{1L, 1.0f, "", 2L, 2.0f, "1"}
+            new Object[]{1L, 1.0f, "", 2L, 2.0f, "1", 1L, 1.0f, "", 2L, 2.0f, "1"}
         )
     );
   }
@@ -648,7 +656,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
     testQuery(
         "SELECT "
         + "LATEST(cnt), LATEST(m1), LATEST(dim1, 10), "
-        + "LATEST(cnt + 1), LATEST(m1 + 1), LATEST(dim1 || CAST(cnt AS VARCHAR), 10) "
+        + "LATEST(cnt + 1), LATEST(m1 + 1), LATEST(dim1 || CAST(cnt AS VARCHAR), 10), "
+        + "LATEST(cnt, m1), LATEST(m1, m1), LATEST(dim1, 10, m1), "
+        + "LATEST(cnt + 1, m1), LATEST(m1 + 1, m1), LATEST(dim1 || CAST(cnt AS VARCHAR), 10, m1) "
         + "FROM druid.foo",
         ImmutableList.of(
             Druids.newTimeseriesQueryBuilder()
@@ -662,19 +672,25 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                   )
                   .aggregators(
                       aggregators(
-                          new LongLastAggregatorFactory("a0", "cnt"),
-                          new FloatLastAggregatorFactory("a1", "m1"),
-                          new StringLastAggregatorFactory("a2", "dim1", 10),
-                          new LongLastAggregatorFactory("a3", "v0"),
-                          new FloatLastAggregatorFactory("a4", "v1"),
-                          new StringLastAggregatorFactory("a5", "v2", 10)
+                          new LongLastAggregatorFactory("a0", "cnt", null),
+                          new FloatLastAggregatorFactory("a1", "m1", null),
+                          new StringLastAggregatorFactory("a2", "dim1", null, 10),
+                          new LongLastAggregatorFactory("a3", "v0", null),
+                          new FloatLastAggregatorFactory("a4", "v1", null),
+                          new StringLastAggregatorFactory("a5", "v2", null, 10),
+                          new LongLastAggregatorFactory("a6", "cnt", "m1"),
+                          new FloatLastAggregatorFactory("a7", "m1", "m1"),
+                          new StringLastAggregatorFactory("a8", "dim1", "m1", 10),
+                          new LongLastAggregatorFactory("a9", "v0", "m1"),
+                          new FloatLastAggregatorFactory("a10", "v1", "m1"),
+                          new StringLastAggregatorFactory("a11", "v2", "m1", 10)
                       )
                   )
                   .context(QUERY_CONTEXT_DEFAULT)
                   .build()
         ),
         ImmutableList.of(
-            new Object[]{1L, 6.0f, "abc", 2L, 7.0f, "abc1"}
+            new Object[]{1L, 6.0f, "abc", 2L, 7.0f, "abc1", 1L, 6.0f, "abc", 2L, 7.0f, "abc1"}
         )
     );
   }
@@ -806,9 +822,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                                         .setGranularity(Granularities.ALL)
                                         .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
                                         .setAggregatorSpecs(aggregators(
-                                                                new FloatLastAggregatorFactory("a0:a", "m1"),
-                                                                new LongLastAggregatorFactory("a1:a", "cnt"),
-                                                                new DoubleLastAggregatorFactory("a2:a", "m2")
+                                                                new FloatLastAggregatorFactory("a0:a", "m1", null),
+                                                                new LongLastAggregatorFactory("a1:a", "cnt", null),
+                                                                new DoubleLastAggregatorFactory("a2:a", "m2", null)
                                                             )
                                         )
                                         .setPostAggregatorSpecs(
@@ -857,9 +873,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                                         .setGranularity(Granularities.ALL)
                                         .setDimensions(dimensions(new DefaultDimensionSpec("dim2", "d0")))
                                         .setAggregatorSpecs(aggregators(
-                                                                new FloatFirstAggregatorFactory("a0:a", "m1"),
-                                                                new LongFirstAggregatorFactory("a1:a", "cnt"),
-                                                                new DoubleFirstAggregatorFactory("a2:a", "m2")
+                                                                new FloatFirstAggregatorFactory("a0:a", "m1", null),
+                                                                new LongFirstAggregatorFactory("a1:a", "cnt", null),
+                                                                new DoubleFirstAggregatorFactory("a2:a", "m2", null)
                                                             )
                                         )
                                         .setPostAggregatorSpecs(
@@ -910,6 +926,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                                         .setAggregatorSpecs(aggregators(new StringLastAggregatorFactory(
                                             "a0:a",
                                             "dim1",
+                                            null,
                                             10
                                         )))
                                         .setPostAggregatorSpecs(
@@ -957,6 +974,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                                         .setAggregatorSpecs(aggregators(new StringFirstAggregatorFactory(
                                             "a0:a",
                                             "dim1",
+                                            null,
                                             10
                                         )))
                                         .setPostAggregatorSpecs(
@@ -1106,9 +1124,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                   .granularity(Granularities.ALL)
                   .aggregators(
                       aggregators(
-                          new LongFirstAggregatorFactory("a0", "l1"),
-                          new DoubleFirstAggregatorFactory("a1", "d1"),
-                          new FloatFirstAggregatorFactory("a2", "f1")
+                          new LongFirstAggregatorFactory("a0", "l1", null),
+                          new DoubleFirstAggregatorFactory("a1", "d1", null),
+                          new FloatFirstAggregatorFactory("a2", "f1", null)
                       )
                   )
                   .context(QUERY_CONTEXT_DEFAULT)
@@ -1135,9 +1153,9 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                   .granularity(Granularities.ALL)
                   .aggregators(
                       aggregators(
-                          new LongLastAggregatorFactory("a0", "l1"),
-                          new DoubleLastAggregatorFactory("a1", "d1"),
-                          new FloatLastAggregatorFactory("a2", "f1")
+                          new LongLastAggregatorFactory("a0", "l1", null),
+                          new DoubleLastAggregatorFactory("a1", "d1", null),
+                          new FloatLastAggregatorFactory("a2", "f1", null)
                       )
                   )
                   .context(QUERY_CONTEXT_DEFAULT)
@@ -1182,10 +1200,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                   .filters(filter)
                   .aggregators(
                       aggregators(
-                          new StringFirstAggregatorFactory("a0", "dim1", 32),
-                          new LongLastAggregatorFactory("a1", "l1"),
-                          new DoubleLastAggregatorFactory("a2", "d1"),
-                          new FloatLastAggregatorFactory("a3", "f1")
+                          new StringFirstAggregatorFactory("a0", "dim1", null, 32),
+                          new LongLastAggregatorFactory("a1", "l1", null),
+                          new DoubleLastAggregatorFactory("a2", "d1", null),
+                          new FloatLastAggregatorFactory("a3", "f1", null)
                       )
                   )
                   .context(QUERY_CONTEXT_DEFAULT)
@@ -1303,7 +1321,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 .dimension(new DefaultDimensionSpec("dim1", "_d0"))
                 .aggregators(
                     aggregators(
-                        new FloatFirstAggregatorFactory("a0", "f1")
+                        new FloatFirstAggregatorFactory("a0", "f1", null)
                     )
                 )
                 .metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
@@ -1350,7 +1368,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 .dimension(new DefaultDimensionSpec("dim1", "_d0"))
                 .aggregators(
                     aggregators(
-                        new DoubleFirstAggregatorFactory("a0", "d1")
+                        new DoubleFirstAggregatorFactory("a0", "d1", null)
                     )
                 )
                 .metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
@@ -1397,7 +1415,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 .dimension(new DefaultDimensionSpec("dim1", "_d0"))
                 .aggregators(
                     aggregators(
-                        new LongFirstAggregatorFactory("a0", "l1")
+                        new LongFirstAggregatorFactory("a0", "l1", null)
                     )
                 )
                 .metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
@@ -1445,7 +1463,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 .dimension(new DefaultDimensionSpec("dim1", "_d0"))
                 .aggregators(
                     aggregators(
-                        new FloatLastAggregatorFactory("a0", "f1")
+                        new FloatLastAggregatorFactory("a0", "f1", null)
                     )
                 )
                 .metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
@@ -1492,7 +1510,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 .dimension(new DefaultDimensionSpec("dim1", "_d0"))
                 .aggregators(
                     aggregators(
-                        new DoubleLastAggregatorFactory("a0", "d1")
+                        new DoubleLastAggregatorFactory("a0", "d1", null)
                     )
                 )
                 .metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
@@ -1539,7 +1557,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                 .dimension(new DefaultDimensionSpec("dim1", "_d0"))
                 .aggregators(
                     aggregators(
-                        new LongLastAggregatorFactory("a0", "l1")
+                        new LongLastAggregatorFactory("a0", "l1", null)
                     )
                 )
                 .metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
@@ -8765,10 +8783,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                       aggregators(
                           new StringAnyAggregatorFactory("a0", "dim1", 1024),
                           new LongAnyAggregatorFactory("a1", "l1"),
-                          new StringFirstAggregatorFactory("a2", "dim1", 1024),
-                          new LongFirstAggregatorFactory("a3", "l1"),
-                          new StringLastAggregatorFactory("a4", "dim1", 1024),
-                          new LongLastAggregatorFactory("a5", "l1"),
+                          new StringFirstAggregatorFactory("a2", "dim1", null, 1024),
+                          new LongFirstAggregatorFactory("a3", "l1", null),
+                          new StringLastAggregatorFactory("a4", "dim1", null, 1024),
+                          new LongLastAggregatorFactory("a5", "l1", null),
                           new ExpressionLambdaAggregatorFactory(
                               "a6",
                               ImmutableSet.of("dim3"),
@@ -9050,19 +9068,19 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                                     selector("dim1", "nonexistent", null)
                                 ),
                                 new FilteredAggregatorFactory(
-                                    new StringFirstAggregatorFactory("a2", "dim1", 1024),
+                                    new StringFirstAggregatorFactory("a2", "dim1", null, 1024),
                                     selector("dim1", "nonexistent", null)
                                 ),
                                 new FilteredAggregatorFactory(
-                                    new LongFirstAggregatorFactory("a3", "l1"),
+                                    new LongFirstAggregatorFactory("a3", "l1", null),
                                     selector("dim1", "nonexistent", null)
                                 ),
                                 new FilteredAggregatorFactory(
-                                    new StringLastAggregatorFactory("a4", "dim1", 1024),
+                                    new StringLastAggregatorFactory("a4", "dim1", null, 1024),
                                     selector("dim1", "nonexistent", null)
                                 ),
                                 new FilteredAggregatorFactory(
-                                    new LongLastAggregatorFactory("a5", "l1"),
+                                    new LongLastAggregatorFactory("a5", "l1", null),
                                     selector("dim1", "nonexistent", null)
                                 ),
                                 new FilteredAggregatorFactory(

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org