You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/10/26 05:14:21 UTC

[pinot] branch master updated: Add LASTWITHTIME aggregate function support #7315 (#7584)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new da3bce5  Add LASTWITHTIME aggregate function support #7315 (#7584)
da3bce5 is described below

commit da3bce50eb79656d967da7636a741653735abd63
Author: weixiangsun <91...@users.noreply.github.com>
AuthorDate: Mon Oct 25 22:13:59 2021 -0700

    Add LASTWITHTIME aggregate function support #7315 (#7584)
    
    Adding aggregate function to return last value of time-based data set.
---
 .../function/AggregationFunctionTypeTest.java      |   2 +
 .../apache/pinot/core/common/ObjectSerDeUtils.java | 122 ++++-
 .../function/AggregationFunctionFactory.java       |  31 ++
 ...LastDoubleValueWithTimeAggregationFunction.java | 126 +++++
 .../LastFloatValueWithTimeAggregationFunction.java | 127 +++++
 .../LastIntValueWithTimeAggregationFunction.java   | 142 ++++++
 .../LastLongValueWithTimeAggregationFunction.java  | 126 +++++
 ...LastStringValueWithTimeAggregationFunction.java | 124 +++++
 .../function/LastWithTimeAggregationFunction.java  | 227 +++++++++
 .../BrokerRequestToQueryContextConverter.java      |   7 +
 .../pinot/core/common/ObjectSerDeUtilsTest.java    |  76 +++
 .../function/AggregationFunctionFactoryTest.java   |  42 ++
 .../pinot/queries/LastWithTimeQueriesTest.java     | 548 +++++++++++++++++++++
 .../segment/local/customobject/DoubleLongPair.java |  45 ++
 .../segment/local/customobject/FloatLongPair.java  |  45 ++
 .../segment/local/customobject/IntLongPair.java    |  45 ++
 .../segment/local/customobject/LongLongPair.java   |  45 ++
 .../segment/local/customobject/StringLongPair.java |  50 ++
 .../segment/local/customobject/ValueLongPair.java  |  50 ++
 .../pinot/segment/spi/AggregationFunctionType.java |   1 +
 20 files changed, 1979 insertions(+), 2 deletions(-)

diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
index 042381b..ea9f69a 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
@@ -33,6 +33,8 @@ public class AggregationFunctionTypeTest {
     Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("SuM"), AggregationFunctionType.SUM);
     Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("AvG"), AggregationFunctionType.AVG);
     Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MoDe"), AggregationFunctionType.MODE);
+    Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("LaStWiThTiMe"),
+            AggregationFunctionType.LASTWITHTIME);
     Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmAxRaNgE"),
         AggregationFunctionType.MINMAXRANGE);
     Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnT"),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 3341a7c..e6ed491 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -62,8 +62,13 @@ import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.query.utils.idset.IdSets;
 import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.FloatLongPair;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
+import org.apache.pinot.segment.local.customobject.LongLongPair;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
+import org.apache.pinot.segment.local.customobject.StringLongPair;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -109,7 +114,12 @@ public class ObjectSerDeUtils {
     Int2LongMap(23),
     Long2LongMap(24),
     Float2LongMap(25),
-    Double2LongMap(26);
+    Double2LongMap(26),
+    IntLongPair(27),
+    LongLongPair(28),
+    FloatLongPair(29),
+    DoubleLongPair(30),
+    StringLongPair(31);
     private final int _value;
 
     ObjectType(int value) {
@@ -178,6 +188,16 @@ public class ObjectSerDeUtils {
         return ObjectType.IdSet;
       } else if (value instanceof List) {
         return ObjectType.List;
+      } else if (value instanceof IntLongPair) {
+        return ObjectType.IntLongPair;
+      } else if (value instanceof LongLongPair) {
+        return ObjectType.LongLongPair;
+      } else if (value instanceof FloatLongPair) {
+        return ObjectType.FloatLongPair;
+      } else if (value instanceof DoubleLongPair) {
+        return ObjectType.DoubleLongPair;
+      } else if (value instanceof StringLongPair) {
+        return ObjectType.StringLongPair;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
       }
@@ -330,6 +350,99 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<IntLongPair> INT_LONG_PAIR_SER_DE
+      = new ObjectSerDe<IntLongPair>() {
+
+    @Override
+    public byte[] serialize(IntLongPair intLongPair) {
+      return intLongPair.toBytes();
+    }
+
+    @Override
+    public IntLongPair deserialize(byte[] bytes) {
+      return IntLongPair.fromBytes(bytes);
+    }
+
+    @Override
+    public IntLongPair deserialize(ByteBuffer byteBuffer) {
+      return IntLongPair.fromByteBuffer(byteBuffer);
+    }
+  };
+
+  public static final ObjectSerDe<LongLongPair> LONG_LONG_PAIR_SER_DE
+      = new ObjectSerDe<LongLongPair>() {
+
+    @Override
+    public byte[] serialize(LongLongPair longLongPair) {
+      return longLongPair.toBytes();
+    }
+
+    @Override
+    public LongLongPair deserialize(byte[] bytes) {
+      return LongLongPair.fromBytes(bytes);
+    }
+
+    @Override
+    public LongLongPair deserialize(ByteBuffer byteBuffer) {
+      return LongLongPair.fromByteBuffer(byteBuffer);
+    }
+  };
+
+  public static final ObjectSerDe<FloatLongPair> FLOAT_LONG_PAIR_SER_DE
+      = new ObjectSerDe<FloatLongPair>() {
+
+    @Override
+    public byte[] serialize(FloatLongPair floatLongPair) {
+      return floatLongPair.toBytes();
+    }
+
+    @Override
+    public FloatLongPair deserialize(byte[] bytes) {
+      return FloatLongPair.fromBytes(bytes);
+    }
+
+    @Override
+    public FloatLongPair deserialize(ByteBuffer byteBuffer) {
+      return FloatLongPair.fromByteBuffer(byteBuffer);
+    }
+  };
+  public static final ObjectSerDe<DoubleLongPair> DOUBLE_LONG_PAIR_SER_DE
+      = new ObjectSerDe<DoubleLongPair>() {
+
+    @Override
+    public byte[] serialize(DoubleLongPair doubleLongPair) {
+      return doubleLongPair.toBytes();
+    }
+
+    @Override
+    public DoubleLongPair deserialize(byte[] bytes) {
+      return DoubleLongPair.fromBytes(bytes);
+    }
+
+    @Override
+    public DoubleLongPair deserialize(ByteBuffer byteBuffer) {
+      return DoubleLongPair.fromByteBuffer(byteBuffer);
+    }
+  };
+  public static final ObjectSerDe<StringLongPair> STRING_LONG_PAIR_SER_DE
+      = new ObjectSerDe<StringLongPair>() {
+
+    @Override
+    public byte[] serialize(StringLongPair stringLongPair) {
+      return stringLongPair.toBytes();
+    }
+
+    @Override
+    public StringLongPair deserialize(byte[] bytes) {
+      return StringLongPair.fromBytes(bytes);
+    }
+
+    @Override
+    public StringLongPair deserialize(ByteBuffer byteBuffer) {
+      return StringLongPair.fromByteBuffer(byteBuffer);
+    }
+  };
+
   public static final ObjectSerDe<HyperLogLog> HYPER_LOG_LOG_SER_DE = new ObjectSerDe<HyperLogLog>() {
 
     @Override
@@ -1047,7 +1160,12 @@ public class ObjectSerDeUtils {
       INT_2_LONG_MAP_SER_DE,
       LONG_2_LONG_MAP_SER_DE,
       FLOAT_2_LONG_MAP_SER_DE,
-      DOUBLE_2_LONG_MAP_SER_DE
+      DOUBLE_2_LONG_MAP_SER_DE,
+      INT_LONG_PAIR_SER_DE,
+      LONG_LONG_PAIR_SER_DE,
+      FLOAT_LONG_PAIR_SER_DE,
+      DOUBLE_LONG_PAIR_SER_DE,
+      STRING_LONG_PAIR_SER_DE
   };
   //@formatter:on
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 3285607..df26e16 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FunctionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 
 
@@ -156,6 +157,36 @@ public class AggregationFunctionFactory {
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() == 3) {
+              ExpressionContext timeCol = arguments.get(1);
+              ExpressionContext dataType = arguments.get(2);
+              if (dataType.getType() != ExpressionContext.Type.LITERAL) {
+                throw new IllegalArgumentException("Third argument of lastWithTime Function should be literal."
+                    + " The function can be used as lastWithTime(dataColumn, timeColumn, 'dataType')");
+              }
+              FieldSpec.DataType fieldDataType
+                  = FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase());
+              switch (fieldDataType) {
+                case BOOLEAN:
+                case INT:
+                  return new LastIntValueWithTimeAggregationFunction(
+                      firstArgument, timeCol, fieldDataType == FieldSpec.DataType.BOOLEAN);
+                case LONG:
+                  return new LastLongValueWithTimeAggregationFunction(firstArgument, timeCol);
+                case FLOAT:
+                  return new LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
+                case DOUBLE:
+                  return new LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
+                case STRING:
+                  return new LastStringValueWithTimeAggregationFunction(firstArgument, timeCol);
+                default:
+                  throw new IllegalArgumentException("Unsupported Value Type for lastWithTime Function:" + dataType);
+              }
+            } else {
+              throw new IllegalArgumentException("Three arguments are required for lastWithTime Function."
+                  + " The function can be used as lastWithTime(dataColumn, timeColumn, 'dataType')");
+            }
           case MINMAXRANGE:
             return new MinMaxRangeAggregationFunction(firstArgument);
           case DISTINCTCOUNT:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000..796a0cb
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR
+      = new DoubleLongPair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Double> constructValueLongPair(Double value, long time) {
+    return new DoubleLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
+    Double lastData = defaultValueLongPair.getValue();
+    long lastTime = defaultValueLongPair.getTime();
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double value = doubleValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'DOUBLE')";
+  }
+
+  @Override
+  public String getColumnName() {
+    return getType().getName() + "_" + _expression + "_" + _timeCol + "_DOUBLE";
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.DOUBLE;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000..6061a83
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.FloatLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with float type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'float')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the float data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastFloatValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Float> {
+
+  private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new FloatLongPair(Float.NaN, Long.MIN_VALUE);
+
+  public LastFloatValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Float> constructValueLongPair(Float value, long time) {
+    return new FloatLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Float> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair();
+    Float lastData = defaultValueLongPair.getValue();
+    long lastTime = defaultValueLongPair.getTime();
+    float[] floatValues = blockValSet.getFloatValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      float data = floatValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length,
+      int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    float[] floatValues = blockValSet.getFloatValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      float data = floatValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    float[] floatValues = blockValSet.getFloatValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      float value = floatValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'FLOAT')";
+  }
+
+  @Override
+  public String getColumnName() {
+    return getType().getName() + "_" + _expression + "_" + _timeCol + "_FLOAT";
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.FLOAT;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000..ff9fdee
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with int/boolean type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'int')
+ * or LastWithTime(dataExpression, timeExpression, 'boolean')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the int/boolean data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastIntValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Integer> {
+
+  private final static ValueLongPair<Integer> DEFAULT_VALUE_TIME_PAIR
+      = new IntLongPair(Integer.MIN_VALUE, Long.MIN_VALUE);
+  private final boolean _isBoolean;
+
+  public LastIntValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol,
+      boolean isBoolean) {
+    super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE);
+    _isBoolean = isBoolean;
+  }
+
+  @Override
+  public ValueLongPair<Integer> constructValueLongPair(Integer value, long time) {
+    return new IntLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Integer> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair();
+    Integer lastData = defaultValueLongPair.getValue();
+    long lastTime = defaultValueLongPair.getTime();
+    int[] intValues = blockValSet.getIntValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      int data = intValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    int[] intValues = blockValSet.getIntValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      int data = intValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    int[] intValues = blockValSet.getIntValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      int value = intValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    if (_isBoolean) {
+      return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'BOOLEAN')";
+    } else {
+      return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'INT')";
+    }
+  }
+
+  @Override
+  public String getColumnName() {
+    if (_isBoolean) {
+      return getType().getName() + "_" + _expression + "_" + _timeCol + "_BOOLEAN";
+    } else {
+      return getType().getName() + "_" + _expression + "_" + _timeCol + "_INT";
+    }
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    if (_isBoolean) {
+      return ColumnDataType.BOOLEAN;
+    } else {
+      return ColumnDataType.INT;
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000..fa3c15f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LongLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with long type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'long')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the long data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastLongValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Long> {
+
+  private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR
+      = new LongLongPair(Long.MIN_VALUE, Long.MIN_VALUE);
+
+  public LastLongValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Long> constructValueLongPair(Long value, long time) {
+    return new LongLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Long> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair();
+    Long lastData = defaultValueLongPair.getValue();
+    long lastTime = defaultValueLongPair.getTime();
+    long[] longValues = blockValSet.getLongValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      long data = longValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    long[] longValues = blockValSet.getLongValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      long data = longValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    long[] longValues = blockValSet.getLongValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      long value = longValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'LONG')";
+  }
+
+  @Override
+  public String getColumnName() {
+    return getType().getName() + "_" + _expression + "_" + _timeCol + "_LONG";
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.LONG;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000..cb3caa8
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with string type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'string')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the string data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastStringValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<String> {
+  private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new StringLongPair("", Long.MIN_VALUE);
+
+  public LastStringValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<String> constructValueLongPair(String value, long time) {
+    return new StringLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<String> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair();
+    String lastData = defaultValueLongPair.getValue();
+    long lastTime = defaultValueLongPair.getTime();
+    String[] stringValues = blockValSet.getStringValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      String data = stringValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    String[] stringValues = blockValSet.getStringValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      String data = stringValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet) {
+    String[] stringValues = blockValSet.getStringValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      String value = stringValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'STRING')";
+  }
+
+  @Override
+  public String getColumnName() {
+    return getType().getName() + "_" + _expression + "_" + _timeCol + "_STRING";
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    return ColumnDataType.STRING;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
new file mode 100644
index 0000000..adc91da
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * This function is used for LastWithTime calculations.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'dataType')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ *   <li>dataType: the data type of data column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>>
+    extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+  protected final ExpressionContext _timeCol;
+  private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> _objectSerDe;
+
+  public LastWithTimeAggregationFunction(ExpressionContext dataCol,
+      ExpressionContext timeCol,
+      ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
+    super(dataCol);
+    _timeCol = timeCol;
+    _objectSerDe = objectSerDe;
+  }
+
+  public abstract ValueLongPair<V> constructValueLongPair(V value, long time);
+
+  public abstract ValueLongPair<V> getDefaultValueTimePair();
+
+  public abstract void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet);
+
+  public abstract void aggregateGroupResultWithRawDataSv(int length,
+      int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet);
+
+  public abstract void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet);
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.LASTWITHTIME;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, blockTimeSet);
+    } else {
+      ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
+      V lastData = defaultValueLongPair.getValue();
+      long lastTime = defaultValueLongPair.getTime();
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+        V data = lastWithTimePair.getValue();
+        long time = lastWithTimePair.getTime();
+        if (time >= lastTime) {
+          lastTime = time;
+          lastData = data;
+        }
+      }
+      setAggregationResult(aggregationResultHolder, lastData, lastTime);
+    }
+  }
+
+  protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, V data, long time) {
+    ValueLongPair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) {
+      aggregationResultHolder.setValue(constructValueLongPair(data, time));
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateGroupResultWithRawDataSv(length, groupKeyArray, groupByResultHolder,
+          blockValSet, timeValSet);
+    } else {
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+        setGroupByResult(groupKeyArray[i],
+            groupByResultHolder,
+            lastWithTimePair.getValue(),
+            lastWithTimePair.getTime());
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateGroupResultWithRawDataMv(length, groupKeysArray, groupByResultHolder, blockValSet, timeValSet);
+    } else {
+      // Serialized ValueTimePair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+        V data = lastWithTimePair.getValue();
+        long time = lastWithTimePair.getTime();
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, data, time);
+        }
+      }
+    }
+  }
+
+  protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, V data, long time) {
+    ValueLongPair lastWithTimePair = groupByResultHolder.getResult(groupKey);
+    if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) {
+      groupByResultHolder.setValueForKey(groupKey, constructValueLongPair(data, time));
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    ValueLongPair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null) {
+      return getDefaultValueTimePair();
+    } else {
+      return lastWithTimePair;
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    ValueLongPair<V> lastWithTimePair = groupByResultHolder.getResult(groupKey);
+    if (lastWithTimePair == null) {
+      return getDefaultValueTimePair();
+    } else {
+      return lastWithTimePair;
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> merge(ValueLongPair<V> intermediateResult1, ValueLongPair<V> intermediateResult2) {
+    if (intermediateResult1.getTime() >= intermediateResult2.getTime()) {
+      return intermediateResult1;
+    } else {
+      return intermediateResult2;
+    }
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol);
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return false;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public V extractFinalResult(ValueLongPair<V> intermediateResult) {
+    return intermediateResult.getValue();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
index 8eea09b..e030f4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
@@ -164,6 +164,13 @@ public class BrokerRequestToQueryContextConverter {
           for (String expression : stringExpressions) {
             arguments.add(RequestContextUtils.getExpressionFromPQL(expression));
           }
+        } else if (functionName.equalsIgnoreCase(AggregationFunctionType.LASTWITHTIME.getName())) {
+          // For LASTWITHTIME query, only the first two arguments are expression, third one is literal if available
+          arguments.add(RequestContextUtils.getExpressionFromPQL(stringExpressions.get(0)));
+          arguments.add(RequestContextUtils.getExpressionFromPQL(stringExpressions.get(1)));
+          for (int i = 2; i < numArguments; i++) {
+            arguments.add(ExpressionContext.forLiteral(stringExpressions.get(i)));
+          }
         } else {
           // For non-DISTINCT query, only the first argument is expression, others are literals
           // NOTE: We directly use the string as the literal value because of the legacy behavior of PQL compiler
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 8e4c6df..3b8bfc4 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -34,8 +34,14 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
 import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.FloatLongPair;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
+import org.apache.pinot.segment.local.customobject.LongLongPair;
 import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
 import org.apache.pinot.segment.local.customobject.QuantileDigest;
+import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -127,6 +133,76 @@ public class ObjectSerDeUtilsTest {
   }
 
   @Test
+  public void testIntValueTimePair() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      ValueLongPair<Integer> expected = new IntLongPair(RANDOM.nextInt(), RANDOM.nextLong());
+
+      byte[] bytes = ObjectSerDeUtils.serialize(expected);
+      ValueLongPair<Integer> actual = ObjectSerDeUtils.deserialize(bytes,
+          ObjectSerDeUtils.ObjectType.IntLongPair);
+
+      assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE);
+      assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testLongValueTimePair() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      ValueLongPair<Long> expected = new LongLongPair(RANDOM.nextLong(), RANDOM.nextLong());
+
+      byte[] bytes = ObjectSerDeUtils.serialize(expected);
+      ValueLongPair<Long> actual = ObjectSerDeUtils.deserialize(bytes,
+          ObjectSerDeUtils.ObjectType.LongLongPair);
+
+      assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE);
+      assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testFloatValueTimePair() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      ValueLongPair<Float> expected = new FloatLongPair(RANDOM.nextFloat(), RANDOM.nextLong());
+
+      byte[] bytes = ObjectSerDeUtils.serialize(expected);
+      ValueLongPair<Float> actual = ObjectSerDeUtils.deserialize(bytes, ObjectSerDeUtils.ObjectType.FloatLongPair);
+
+      assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE);
+      assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testDoubleValueTimePair() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      ValueLongPair<Double> expected = new DoubleLongPair(RANDOM.nextDouble(), RANDOM.nextLong());
+
+      byte[] bytes = ObjectSerDeUtils.serialize(expected);
+      ValueLongPair<Double> actual = ObjectSerDeUtils.deserialize(bytes,
+          ObjectSerDeUtils.ObjectType.DoubleLongPair);
+
+      assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE);
+      assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE);
+    }
+  }
+
+  @Test
+  public void testStringValueTimePair() {
+    for (int i = 0; i < NUM_ITERATIONS; i++) {
+      ValueLongPair<String> expected = new StringLongPair(String.valueOf(RANDOM.nextDouble()), RANDOM.nextLong());
+
+      String temp = new String(expected.getValue().getBytes());
+      byte[] bytes = ObjectSerDeUtils.serialize(expected);
+      ValueLongPair<String> actual = ObjectSerDeUtils.deserialize(bytes,
+          ObjectSerDeUtils.ObjectType.StringLongPair);
+
+      assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE);
+      assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE);
+    }
+  }
+
+  @Test
   public void testHyperLogLog() {
     for (int i = 0; i < NUM_ITERATIONS; i++) {
       HyperLogLog expected = new HyperLogLog(7);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index e5e0e22..9c730f1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -87,6 +87,48 @@ public class AggregationFunctionFactoryTest {
     assertEquals(aggregationFunction.getColumnName(), "mode_column");
     assertEquals(aggregationFunction.getResultColumnName(), function.toString());
 
+    function = getFunction("LaStWiThTiMe", "(column,timeColumn,'BOOLEAN')");
+    aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof LastIntValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_BOOLEAN");
+    assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
+    function = getFunction("LaStWiThTiMe", "(column,timeColumn,'INT')");
+    aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof LastIntValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_INT");
+    assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
+    function = getFunction("LaStWiThTiMe", "(column,timeColumn,'LONG')");
+    aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof LastLongValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_LONG");
+    assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
+    function = getFunction("LaStWiThTiMe", "(column,timeColumn,'FLOAT')");
+    aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof LastFloatValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_FLOAT");
+    assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
+    function = getFunction("LaStWiThTiMe", "(column,timeColumn,'DOUBLE')");
+    aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof LastDoubleValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_DOUBLE");
+    assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
+    function = getFunction("LaStWiThTiMe", "(column,timeColumn,'STRING')");
+    aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+    assertTrue(aggregationFunction instanceof LastStringValueWithTimeAggregationFunction);
+    assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+    assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_STRING");
+    assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
     function = getFunction("MiNmAxRaNgE");
     aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
     assertTrue(aggregationFunction instanceof MinMaxRangeAggregationFunction);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/LastWithTimeQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/LastWithTimeQueriesTest.java
new file mode 100644
index 0000000..75d78cc
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/LastWithTimeQueriesTest.java
@@ -0,0 +1,548 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.GroupByResult;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Queries test for LASTWITHTIME queries.
+ */
+@SuppressWarnings("rawtypes")
+public class LastWithTimeQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "LastQueriesTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final Random RANDOM = new Random();
+
+  private static final int NUM_RECORDS = 2000;
+  private static final int MAX_VALUE = 1000;
+
+  private static final String BOOL_COLUMN = "boolColumn";
+  private static final String BOOL_NO_DICT_COLUMN = "boolNoDictColumn";
+  private static final String INT_COLUMN = "intColumn";
+  private static final String INT_MV_COLUMN = "intMvColumn";
+  private static final String INT_NO_DICT_COLUMN = "intNoDictColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String LONG_MV_COLUMN = "longMvColumn";
+  private static final String LONG_NO_DICT_COLUMN = "longNoDictColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String FLOAT_MV_COLUMN = "floatMvColumn";
+  private static final String FLOAT_NO_DICT_COLUMN = "floatNoDictColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String DOUBLE_MV_COLUMN = "doubleMvColumn";
+  private static final String DOUBLE_NO_DICT_COLUMN = "doubleNoDictColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String STRING_MV_COLUMN = "stringMvColumn";
+  private static final String STRING_NO_DICT_COLUMN = "stringNoDictColumn";
+  private static final String TIME_COLUMN = "timestampColumn";
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .addSingleValueDimension(BOOL_COLUMN, DataType.BOOLEAN)
+      .addSingleValueDimension(BOOL_NO_DICT_COLUMN, DataType.BOOLEAN)
+      .addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addMultiValueDimension(INT_MV_COLUMN, DataType.INT)
+      .addSingleValueDimension(INT_NO_DICT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, DataType.LONG)
+      .addMultiValueDimension(LONG_MV_COLUMN, DataType.LONG)
+      .addSingleValueDimension(LONG_NO_DICT_COLUMN, DataType.LONG)
+      .addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addMultiValueDimension(FLOAT_MV_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(FLOAT_NO_DICT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE)
+      .addMultiValueDimension(DOUBLE_MV_COLUMN, DataType.DOUBLE)
+      .addSingleValueDimension(DOUBLE_NO_DICT_COLUMN, DataType.DOUBLE)
+      .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addMultiValueDimension(STRING_MV_COLUMN, DataType.STRING)
+      .addSingleValueDimension(STRING_NO_DICT_COLUMN, DataType.STRING)
+      .addSingleValueDimension(TIME_COLUMN, DataType.LONG).build();
+  private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+      .setNoDictionaryColumns(
+          Lists.newArrayList(INT_NO_DICT_COLUMN, LONG_NO_DICT_COLUMN, FLOAT_NO_DICT_COLUMN, DOUBLE_NO_DICT_COLUMN))
+      .build();
+  private static final double DELTA = 0.00001;
+
+  private Boolean _expectedResultLastBoolean;
+  private Integer _expectedResultLastInt;
+  private Long _expectedResultLastLong;
+  private Float _expectedResultLastFloat;
+  private Double _expectedResultLastDouble;
+  private String _expectedResultLastString;
+  private Map<Integer, Boolean> _boolGroupValues;
+  private Map<Integer, Integer> _intGroupValues;
+  private Map<Integer, Long> _longGroupValues;
+  private Map<Integer, Float> _floatGroupValues;
+  private Map<Integer, Double> _doubleGroupValues;
+  private Map<Integer, String> _stringGroupValues;
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    // NOTE: Use a match all filter to switch between DictionaryBasedAggregationOperator and AggregationOperator
+    return " WHERE intColumn >= 0";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE);
+    _boolGroupValues = new HashMap<>();
+    _intGroupValues = new HashMap<>();
+    _longGroupValues = new HashMap<>();
+    _floatGroupValues = new HashMap<>();
+    _doubleGroupValues = new HashMap<>();
+    _stringGroupValues = new HashMap<>();
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      boolean boolValue = RANDOM.nextBoolean();
+      int intValue = RANDOM.nextInt(MAX_VALUE);
+      long longValue = RANDOM.nextLong();
+      float floatValue = RANDOM.nextFloat();
+      double doubleValue = RANDOM.nextDouble();
+      String strValue = String.valueOf(RANDOM.nextDouble());
+      GenericRow record = new GenericRow();
+      record.putValue(BOOL_COLUMN, boolValue);
+      record.putValue(BOOL_NO_DICT_COLUMN, boolValue);
+      record.putValue(INT_COLUMN, intValue);
+      record.putValue(INT_MV_COLUMN, new Integer[]{intValue, intValue});
+      record.putValue(INT_NO_DICT_COLUMN, intValue);
+      record.putValue(LONG_COLUMN, longValue);
+      record.putValue(LONG_MV_COLUMN, new Long[]{longValue, longValue});
+      record.putValue(LONG_NO_DICT_COLUMN, longValue);
+      record.putValue(FLOAT_COLUMN, floatValue);
+      record.putValue(FLOAT_MV_COLUMN, new Float[]{floatValue, floatValue});
+      record.putValue(FLOAT_NO_DICT_COLUMN, floatValue);
+      record.putValue(DOUBLE_COLUMN, doubleValue);
+      record.putValue(DOUBLE_MV_COLUMN, new Double[]{doubleValue, doubleValue});
+      record.putValue(DOUBLE_NO_DICT_COLUMN, doubleValue);
+      record.putValue(STRING_COLUMN, strValue);
+      record.putValue(STRING_MV_COLUMN, new String[]{strValue, strValue});
+      record.putValue(STRING_NO_DICT_COLUMN, strValue);
+      record.putValue(TIME_COLUMN, (long) i);
+      if (i == NUM_RECORDS - 1) {
+        _expectedResultLastBoolean = boolValue;
+        _expectedResultLastInt = intValue;
+        _expectedResultLastLong = longValue;
+        _expectedResultLastFloat = floatValue;
+        _expectedResultLastDouble = doubleValue;
+        _expectedResultLastString = strValue;
+      }
+      _boolGroupValues.put(intValue, boolValue);
+      _intGroupValues.put(intValue, intValue);
+      _longGroupValues.put(intValue, longValue);
+      _floatGroupValues.put(intValue, floatValue);
+      _doubleGroupValues.put(intValue, doubleValue);
+      _stringGroupValues.put(intValue, strValue);
+      records.add(record);
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  @Test
+  public void testAggregationOnly() {
+    String query = "SELECT LASTWITHTIME(boolColumn,timestampColumn, BOOLEAN),"
+        + " LASTWITHTIME(intColumn,timestampColumn, Int),"
+        + " LASTWITHTIME(longColumn,timestampColumn, Long),"
+        + " LASTWITHTIME(floatColumn,timestampColumn, Float),"
+        + " LASTWITHTIME(doubleColumn,timestampColumn, Double),"
+        + " LASTWITHTIME(stringColumn,timestampColumn, String) FROM testTable";
+
+    // Inner segment
+    Operator operator = getOperatorForPqlQuery(query);
+    assertTrue(operator instanceof AggregationOperator);
+    IntermediateResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock();
+    QueriesTestUtils
+        .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS,
+            NUM_RECORDS);
+    List<Object> aggregationResultsWithoutFilter = resultsBlock.getAggregationResult();
+
+    operator = getOperatorForPqlQueryWithFilter(query);
+    assertTrue(operator instanceof AggregationOperator);
+    IntermediateResultsBlock resultsBlockWithFilter = ((AggregationOperator) operator).nextBlock();
+    QueriesTestUtils
+        .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS,
+            NUM_RECORDS);
+    List<Object> aggregationResultWithFilter = resultsBlockWithFilter.getAggregationResult();
+
+    assertNotNull(aggregationResultsWithoutFilter);
+    assertNotNull(aggregationResultWithFilter);
+    assertEquals(aggregationResultsWithoutFilter.size(), aggregationResultWithFilter.size());
+    for (int i = 0; i < aggregationResultsWithoutFilter.size(); i++) {
+      assertTrue(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(i)).compareTo(
+          (ValueLongPair<Integer>) aggregationResultWithFilter.get(i)) == 0);
+    }
+    assertEquals((((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(0))).getValue() != 0,
+        _expectedResultLastBoolean.booleanValue());
+    assertEquals(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(1)).getValue(), _expectedResultLastInt);
+    assertEquals(((ValueLongPair<Long>) aggregationResultsWithoutFilter.get(2)).getValue(), _expectedResultLastLong);
+    assertEquals(((ValueLongPair<Float>) aggregationResultsWithoutFilter.get(3)).getValue(), _expectedResultLastFloat);
+    assertEquals(((ValueLongPair<Double>) aggregationResultsWithoutFilter.get(4)).getValue(),
+        _expectedResultLastDouble);
+    assertEquals(((ValueLongPair<String>) aggregationResultsWithoutFilter.get(5)).getValue(),
+        _expectedResultLastString);
+
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+
+    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+    List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
+    Assert.assertEquals(aggregationResults.size(), 6);
+    Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()),
+        _expectedResultLastBoolean.booleanValue());
+    Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()),
+        _expectedResultLastInt.intValue());
+    Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()),
+        _expectedResultLastLong.longValue());
+    Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()),
+        _expectedResultLastFloat.floatValue(), DELTA);
+    Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()),
+        _expectedResultLastDouble.doubleValue(), DELTA);
+    Assert.assertEquals(aggregationResults.get(5).getValue().toString(),
+        _expectedResultLastString);
+
+    brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
+    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+    aggregationResults = brokerResponse.getAggregationResults();
+    Assert.assertEquals(aggregationResults.size(), 6);
+    Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()),
+        _expectedResultLastBoolean.booleanValue());
+    Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()),
+        _expectedResultLastInt.intValue());
+    Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()),
+        _expectedResultLastLong.longValue());
+    Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()),
+        _expectedResultLastFloat.floatValue(), DELTA);
+    Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()),
+        _expectedResultLastDouble.doubleValue(), DELTA);
+    Assert.assertEquals(aggregationResults.get(5).getValue().toString(),
+        _expectedResultLastString);
+  }
+
+  @Test
+  public void testAggregationOnlyNoDictionary() {
+    String query =
+        "SELECT LASTWITHTIME(boolNoDictColumn,timestampColumn,boolean),"
+            + " LASTWITHTIME(intNoDictColumn,timestampColumn,int),"
+            + " LASTWITHTIME(longNoDictColumn,timestampColumn,long),"
+            + " LASTWITHTIME(floatNoDictColumn,timestampColumn,float),"
+            + " LASTWITHTIME(doubleNoDictColumn,timestampColumn,double),"
+            + " LASTWITHTIME(stringNoDictColumn,timestampColumn,string) FROM testTable";
+
+    // Inner segment
+    Operator operator = getOperatorForPqlQuery(query);
+    assertTrue(operator instanceof AggregationOperator);
+    IntermediateResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock();
+    QueriesTestUtils
+        .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS,
+            NUM_RECORDS);
+    List<Object> aggregationResultsWithoutFilter = resultsBlock.getAggregationResult();
+
+    operator = getOperatorForPqlQueryWithFilter(query);
+    assertTrue(operator instanceof AggregationOperator);
+    IntermediateResultsBlock resultsBlockWithFilter = ((AggregationOperator) operator).nextBlock();
+    QueriesTestUtils
+        .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS,
+            NUM_RECORDS);
+    List<Object> aggregationResultWithFilter = resultsBlockWithFilter.getAggregationResult();
+
+    assertNotNull(aggregationResultsWithoutFilter);
+    assertNotNull(aggregationResultWithFilter);
+    assertEquals(aggregationResultsWithoutFilter.size(), aggregationResultWithFilter.size());
+    for (int i = 0; i < aggregationResultsWithoutFilter.size(); i++) {
+      assertTrue(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(i)).compareTo(
+          (ValueLongPair<Integer>) aggregationResultWithFilter.get(i)) == 0);
+    }
+
+    assertEquals(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(0)).getValue() != 0,
+        _expectedResultLastBoolean.booleanValue());
+    assertEquals(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(1)).getValue(), _expectedResultLastInt);
+    assertEquals(((ValueLongPair<Long>) aggregationResultsWithoutFilter.get(2)).getValue(), _expectedResultLastLong);
+    assertEquals(((ValueLongPair<Float>) aggregationResultsWithoutFilter.get(3)).getValue(), _expectedResultLastFloat);
+    assertEquals(((ValueLongPair<Double>) aggregationResultsWithoutFilter.get(4)).getValue(),
+        _expectedResultLastDouble);
+    assertEquals(((ValueLongPair<String>) aggregationResultsWithoutFilter.get(5)).getValue(),
+        _expectedResultLastString);
+
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+
+    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+    List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
+    Assert.assertEquals(aggregationResults.size(), 6);
+    Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()),
+        _expectedResultLastBoolean.booleanValue());
+    Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()),
+        _expectedResultLastInt.intValue());
+    Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()),
+        _expectedResultLastLong.longValue());
+    Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()),
+        _expectedResultLastFloat, DELTA);
+    Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()),
+        _expectedResultLastDouble, DELTA);
+    Assert.assertEquals(aggregationResults.get(5).getValue().toString(),
+        _expectedResultLastString);
+
+    brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
+    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+    aggregationResults = brokerResponse.getAggregationResults();
+    Assert.assertEquals(aggregationResults.size(), 6);
+    Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()),
+        _expectedResultLastBoolean.booleanValue());
+    Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()),
+        _expectedResultLastInt.intValue());
+    Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()),
+        _expectedResultLastLong.longValue());
+    Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()),
+        _expectedResultLastFloat, DELTA);
+    Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()),
+        _expectedResultLastDouble, DELTA);
+    Assert.assertEquals(aggregationResults.get(5).getValue().toString(),
+        _expectedResultLastString);
+  }
+
+  @Test
+  public void testAggregationGroupBySv() {
+    String query =
+        "SELECT LASTWITHTIME(boolColumn,timestampColumn,boolean),"
+            + " LASTWITHTIME(intColumn,timestampColumn,int),"
+            + " LASTWITHTIME(longColumn,timestampColumn,long),"
+            + " LASTWITHTIME(floatColumn,timestampColumn,float),"
+            + " LASTWITHTIME(doubleColumn,timestampColumn,double),"
+            + " LASTWITHTIME(stringColumn,timestampColumn,string) FROM testTable GROUP BY intColumn";
+
+    verifyAggregationResultsFromInnerSegments(query, 7);
+
+    verifyAggregationResultsFromInterSegments(query, 7);
+  }
+
+  @Test
+  public void testAggregationGroupBySvNoDictionary() {
+    String query =
+        "SELECT LASTWITHTIME(boolNoDictColumn,timestampColumn,boolean),"
+            + " LASTWITHTIME(intNoDictColumn,timestampColumn,int),"
+            + " LASTWITHTIME(longNoDictColumn,timestampColumn,long),"
+            + " LASTWITHTIME(floatNoDictColumn,timestampColumn,float),"
+            + " LASTWITHTIME(doubleNoDictColumn,timestampColumn,double),"
+            + " LASTWITHTIME(stringNoDictColumn,timestampColumn,string)"
+            + " FROM testTable GROUP BY intNoDictColumn";
+
+    verifyAggregationResultsFromInnerSegments(query, 7);
+
+    verifyAggregationResultsFromInterSegments(query, 7);
+  }
+
+  @Test
+  public void testAggregationGroupByMv() {
+    String query =
+        "SELECT LASTWITHTIME(boolColumn,timestampColumn,boolean),"
+            + " LASTWITHTIME(intColumn,timestampColumn,int),"
+            + " LASTWITHTIME(longColumn,timestampColumn,long),"
+            + " LASTWITHTIME(floatColumn,timestampColumn,float),"
+            + " LASTWITHTIME(doubleColumn,timestampColumn,double),"
+            + " LASTWITHTIME(stringColumn,timestampColumn,string) FROM testTable GROUP BY intMvColumn";
+
+    verifyAggregationResultsFromInnerSegments(query, 8);
+
+    verifyAggregationResultsFromInterSegments(query, 8);
+  }
+
+  @Test
+  public void testAggregationGroupByMvNoDictionary() {
+    String query =
+        "SELECT LASTWITHTIME(boolNoDictColumn,timestampColumn,boolean),"
+            + " LASTWITHTIME(intNoDictColumn,timestampColumn,int),"
+            + " LASTWITHTIME(longNoDictColumn,timestampColumn,long),"
+            + " LASTWITHTIME(floatNoDictColumn,timestampColumn,float),"
+            + " LASTWITHTIME(doubleNoDictColumn,timestampColumn,double),"
+            + " LASTWITHTIME(stringNoDictColumn,timestampColumn,string) FROM testTable GROUP BY intMvColumn";
+
+    verifyAggregationResultsFromInnerSegments(query, 8);
+
+    verifyAggregationResultsFromInterSegments(query, 8);
+  }
+
+  private void verifyAggregationResultsFromInnerSegments(String query, int numOfColumns) {
+    // Inner segment
+    Operator operator = getOperatorForPqlQuery(query);
+    assertTrue(operator instanceof AggregationGroupByOperator);
+    IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator) operator).nextBlock();
+    QueriesTestUtils
+        .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
+            NUM_RECORDS,
+            0,
+            numOfColumns * NUM_RECORDS,
+            NUM_RECORDS);
+    AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
+    assertNotNull(aggregationGroupByResult);
+    int numGroups = 0;
+    Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
+    while (groupKeyIterator.hasNext()) {
+      numGroups++;
+      GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+      Integer key = (Integer) groupKey._keys[0];
+      assertTrue(_intGroupValues.containsKey(key));
+      assertEquals(
+          ((ValueLongPair<Integer>) aggregationGroupByResult.getResultForGroupId(0, groupKey._groupId)).getValue()
+              != 0,
+          _boolGroupValues.get(key).booleanValue());
+      assertEquals(
+          ((ValueLongPair<Integer>) aggregationGroupByResult.getResultForGroupId(1, groupKey._groupId)).getValue(),
+          _intGroupValues.get(key));
+      assertEquals(
+          ((ValueLongPair<Long>) aggregationGroupByResult.getResultForGroupId(2, groupKey._groupId)).getValue(),
+          _longGroupValues.get(key));
+      assertEquals(
+          ((ValueLongPair<Float>) aggregationGroupByResult.getResultForGroupId(3, groupKey._groupId)).getValue(),
+          _floatGroupValues.get(key));
+      assertEquals(
+          ((ValueLongPair<Double>) aggregationGroupByResult.getResultForGroupId(4, groupKey._groupId)).getValue(),
+          _doubleGroupValues.get(key));
+      assertEquals(
+          ((ValueLongPair<String>) aggregationGroupByResult.getResultForGroupId(5, groupKey._groupId)).getValue(),
+          _stringGroupValues.get(key));
+    }
+    assertEquals(numGroups, _intGroupValues.size());
+  }
+
+  private void verifyAggregationResultsFromInterSegments(String query, int numOfColumns) {
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    // Inter segments (expect 4 * inner segment result)
+    Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * numOfColumns * NUM_RECORDS);
+    Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+
+    List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
+    Assert.assertEquals(aggregationResults.size(), 6);
+    Assert.assertNull(aggregationResults.get(0).getValue());
+    for (GroupByResult intGroupByResult : aggregationResults.get(1).getGroupByResult()) {
+      assertEquals(intGroupByResult.getGroup().size(), 1);
+      assertTrue(_intGroupValues.containsKey(Integer.parseInt(intGroupByResult.getGroup().get(0))));
+      assertEquals(Integer.parseInt(intGroupByResult.getValue().toString()),
+          _intGroupValues.get(Integer.parseInt(intGroupByResult.getGroup().get(0))).intValue());
+    }
+
+    Assert.assertNull(aggregationResults.get(1).getValue());
+    for (GroupByResult longGroupByResult : aggregationResults.get(2).getGroupByResult()) {
+      assertEquals(longGroupByResult.getGroup().size(), 1);
+      assertTrue(_longGroupValues.containsKey(Integer.parseInt(longGroupByResult.getGroup().get(0))));
+      assertEquals(Long.parseLong(longGroupByResult.getValue().toString()),
+          _longGroupValues.get(Integer.parseInt(longGroupByResult.getGroup().get(0))), DELTA);
+    }
+
+    Assert.assertNull(aggregationResults.get(2).getValue());
+    for (GroupByResult floatGroupByResult : aggregationResults.get(3).getGroupByResult()) {
+      assertEquals(floatGroupByResult.getGroup().size(), 1);
+      assertTrue(_floatGroupValues.containsKey(Integer.parseInt(floatGroupByResult.getGroup().get(0))));
+      assertEquals(Double.parseDouble(floatGroupByResult.getValue().toString()),
+          _floatGroupValues.get(Integer.parseInt(floatGroupByResult.getGroup().get(0))), DELTA);
+    }
+
+    Assert.assertNull(aggregationResults.get(3).getValue());
+    for (GroupByResult doubleGroupByResult : aggregationResults.get(4).getGroupByResult()) {
+      assertEquals(doubleGroupByResult.getGroup().size(), 1);
+      assertTrue(_doubleGroupValues.containsKey(Integer.parseInt(doubleGroupByResult.getGroup().get(0))));
+      assertEquals(Double.parseDouble(doubleGroupByResult.getValue().toString()),
+          _doubleGroupValues.get(Integer.parseInt(doubleGroupByResult.getGroup().get(0))), DELTA);
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _indexSegment.destroy();
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/DoubleLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/DoubleLongPair.java
new file mode 100644
index 0000000..890a00d
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/DoubleLongPair.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+
+
+public class DoubleLongPair extends ValueLongPair<Double> {
+
+  public DoubleLongPair(Double value, long time) {
+    super(value, time);
+  }
+
+  public static DoubleLongPair fromBytes(byte[] bytes) {
+    return fromByteBuffer(ByteBuffer.wrap(bytes));
+  }
+
+  public static DoubleLongPair fromByteBuffer(ByteBuffer byteBuffer) {
+    return new DoubleLongPair(byteBuffer.getDouble(), byteBuffer.getLong());
+  }
+
+  @Override
+  public byte[] toBytes() {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(Double.BYTES + Long.BYTES);
+    byteBuffer.putDouble(_value);
+    byteBuffer.putLong(_time);
+    return byteBuffer.array();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/FloatLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/FloatLongPair.java
new file mode 100644
index 0000000..94fcb31
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/FloatLongPair.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+
+
+public class FloatLongPair extends ValueLongPair<Float> {
+
+  public FloatLongPair(Float value, long time) {
+    super(value, time);
+  }
+
+  public static FloatLongPair fromBytes(byte[] bytes) {
+    return fromByteBuffer(ByteBuffer.wrap(bytes));
+  }
+
+  public static FloatLongPair fromByteBuffer(ByteBuffer byteBuffer) {
+    return new FloatLongPair(byteBuffer.getFloat(), byteBuffer.getLong());
+  }
+
+  @Override
+  public byte[] toBytes() {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(Float.BYTES + Long.BYTES);
+    byteBuffer.putFloat(_value);
+    byteBuffer.putLong(_time);
+    return byteBuffer.array();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/IntLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/IntLongPair.java
new file mode 100644
index 0000000..191a931
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/IntLongPair.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+
+
+public class IntLongPair extends ValueLongPair<Integer> {
+
+  public IntLongPair(Integer value, long time) {
+    super(value, time);
+  }
+
+  public static IntLongPair fromBytes(byte[] bytes) {
+    return fromByteBuffer(ByteBuffer.wrap(bytes));
+  }
+
+  public static IntLongPair fromByteBuffer(ByteBuffer byteBuffer) {
+    return new IntLongPair(byteBuffer.getInt(), byteBuffer.getLong());
+  }
+
+  @Override
+  public byte[] toBytes() {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES + Long.BYTES);
+    byteBuffer.putInt(_value);
+    byteBuffer.putLong(_time);
+    return byteBuffer.array();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LongLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LongLongPair.java
new file mode 100644
index 0000000..0ffa345
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LongLongPair.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+
+
+public class LongLongPair extends ValueLongPair<Long> {
+
+  public LongLongPair(Long value, long time) {
+    super(value, time);
+  }
+
+  public static LongLongPair fromBytes(byte[] bytes) {
+    return fromByteBuffer(ByteBuffer.wrap(bytes));
+  }
+
+  public static LongLongPair fromByteBuffer(ByteBuffer byteBuffer) {
+    return new LongLongPair(byteBuffer.getLong(), byteBuffer.getLong());
+  }
+
+  @Override
+  public byte[] toBytes() {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES + Long.BYTES);
+    byteBuffer.putLong(_value);
+    byteBuffer.putLong(_time);
+    return byteBuffer.array();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/StringLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/StringLongPair.java
new file mode 100644
index 0000000..bfcdeae
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/StringLongPair.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+
+
+public class StringLongPair extends ValueLongPair<String> {
+
+  public StringLongPair(String value, long time) {
+    super(value, time);
+  }
+
+  public static StringLongPair fromBytes(byte[] bytes) {
+    return fromByteBuffer(ByteBuffer.wrap(bytes));
+  }
+
+  public static StringLongPair fromByteBuffer(ByteBuffer byteBuffer) {
+    int len = byteBuffer.getInt();
+    byte[] bytes = new byte[len];
+    byteBuffer.get(bytes);
+    return new StringLongPair(new String(bytes), byteBuffer.getLong());
+  }
+
+  @Override
+  public byte[] toBytes() {
+    int len = _value.length();
+    ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES + len + Long.BYTES);
+    byteBuffer.putInt(len);
+    byteBuffer.put(_value.getBytes());
+    byteBuffer.putLong(_time);
+    return byteBuffer.array();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
new file mode 100644
index 0000000..81e2ded
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+public abstract class ValueLongPair<V extends Comparable<V>> implements Comparable<ValueLongPair<V>> {
+  protected V _value;
+  protected long _time;
+
+  public ValueLongPair(V value, long time) {
+    _value = value;
+    _time = time;
+  }
+
+  public V getValue() {
+    return _value;
+  }
+
+  public long getTime() {
+    return _time;
+  }
+
+  abstract public byte[] toBytes();
+
+  @Override
+  public int compareTo(ValueLongPair<V> o) {
+    if (_time < o.getTime()) {
+      return -1;
+    } else if (_time > o.getTime()) {
+      return 1;
+    } else {
+      return _value.compareTo(o.getValue());
+    }
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 197239b..53e5eba 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -33,6 +33,7 @@ public enum AggregationFunctionType {
   SUMPRECISION("sumPrecision"),
   AVG("avg"),
   MODE("mode"),
+  LASTWITHTIME("lastWithTime"),
   MINMAXRANGE("minMaxRange"),
   DISTINCTCOUNT("distinctCount"),
   DISTINCTCOUNTBITMAP("distinctCountBitmap"),

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org