You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/10/26 05:14:21 UTC
[pinot] branch master updated: Add LASTWITHTIME aggregate function
support #7315 (#7584)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new da3bce5 Add LASTWITHTIME aggregate function support #7315 (#7584)
da3bce5 is described below
commit da3bce50eb79656d967da7636a741653735abd63
Author: weixiangsun <91...@users.noreply.github.com>
AuthorDate: Mon Oct 25 22:13:59 2021 -0700
Add LASTWITHTIME aggregate function support #7315 (#7584)
Adding aggregate function to return last value of time-based data set.
---
.../function/AggregationFunctionTypeTest.java | 2 +
.../apache/pinot/core/common/ObjectSerDeUtils.java | 122 ++++-
.../function/AggregationFunctionFactory.java | 31 ++
...LastDoubleValueWithTimeAggregationFunction.java | 126 +++++
.../LastFloatValueWithTimeAggregationFunction.java | 127 +++++
.../LastIntValueWithTimeAggregationFunction.java | 142 ++++++
.../LastLongValueWithTimeAggregationFunction.java | 126 +++++
...LastStringValueWithTimeAggregationFunction.java | 124 +++++
.../function/LastWithTimeAggregationFunction.java | 227 +++++++++
.../BrokerRequestToQueryContextConverter.java | 7 +
.../pinot/core/common/ObjectSerDeUtilsTest.java | 76 +++
.../function/AggregationFunctionFactoryTest.java | 42 ++
.../pinot/queries/LastWithTimeQueriesTest.java | 548 +++++++++++++++++++++
.../segment/local/customobject/DoubleLongPair.java | 45 ++
.../segment/local/customobject/FloatLongPair.java | 45 ++
.../segment/local/customobject/IntLongPair.java | 45 ++
.../segment/local/customobject/LongLongPair.java | 45 ++
.../segment/local/customobject/StringLongPair.java | 50 ++
.../segment/local/customobject/ValueLongPair.java | 50 ++
.../pinot/segment/spi/AggregationFunctionType.java | 1 +
20 files changed, 1979 insertions(+), 2 deletions(-)
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
index 042381b..ea9f69a 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/function/AggregationFunctionTypeTest.java
@@ -33,6 +33,8 @@ public class AggregationFunctionTypeTest {
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("SuM"), AggregationFunctionType.SUM);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("AvG"), AggregationFunctionType.AVG);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MoDe"), AggregationFunctionType.MODE);
+ Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("LaStWiThTiMe"),
+ AggregationFunctionType.LASTWITHTIME);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("MiNmAxRaNgE"),
AggregationFunctionType.MINMAXRANGE);
Assert.assertEquals(AggregationFunctionType.getAggregationFunctionType("DiStInCtCoUnT"),
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 3341a7c..e6ed491 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -62,8 +62,13 @@ import org.apache.pinot.core.query.distinct.DistinctTable;
import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.query.utils.idset.IdSets;
import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.FloatLongPair;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
+import org.apache.pinot.segment.local.customobject.LongLongPair;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
+import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
@@ -109,7 +114,12 @@ public class ObjectSerDeUtils {
Int2LongMap(23),
Long2LongMap(24),
Float2LongMap(25),
- Double2LongMap(26);
+ Double2LongMap(26),
+ IntLongPair(27),
+ LongLongPair(28),
+ FloatLongPair(29),
+ DoubleLongPair(30),
+ StringLongPair(31);
private final int _value;
ObjectType(int value) {
@@ -178,6 +188,16 @@ public class ObjectSerDeUtils {
return ObjectType.IdSet;
} else if (value instanceof List) {
return ObjectType.List;
+ } else if (value instanceof IntLongPair) {
+ return ObjectType.IntLongPair;
+ } else if (value instanceof LongLongPair) {
+ return ObjectType.LongLongPair;
+ } else if (value instanceof FloatLongPair) {
+ return ObjectType.FloatLongPair;
+ } else if (value instanceof DoubleLongPair) {
+ return ObjectType.DoubleLongPair;
+ } else if (value instanceof StringLongPair) {
+ return ObjectType.StringLongPair;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
@@ -330,6 +350,99 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<IntLongPair> INT_LONG_PAIR_SER_DE
+ = new ObjectSerDe<IntLongPair>() {
+
+ @Override
+ public byte[] serialize(IntLongPair intLongPair) {
+ return intLongPair.toBytes();
+ }
+
+ @Override
+ public IntLongPair deserialize(byte[] bytes) {
+ return IntLongPair.fromBytes(bytes);
+ }
+
+ @Override
+ public IntLongPair deserialize(ByteBuffer byteBuffer) {
+ return IntLongPair.fromByteBuffer(byteBuffer);
+ }
+ };
+
+ public static final ObjectSerDe<LongLongPair> LONG_LONG_PAIR_SER_DE
+ = new ObjectSerDe<LongLongPair>() {
+
+ @Override
+ public byte[] serialize(LongLongPair longLongPair) {
+ return longLongPair.toBytes();
+ }
+
+ @Override
+ public LongLongPair deserialize(byte[] bytes) {
+ return LongLongPair.fromBytes(bytes);
+ }
+
+ @Override
+ public LongLongPair deserialize(ByteBuffer byteBuffer) {
+ return LongLongPair.fromByteBuffer(byteBuffer);
+ }
+ };
+
+ public static final ObjectSerDe<FloatLongPair> FLOAT_LONG_PAIR_SER_DE
+ = new ObjectSerDe<FloatLongPair>() {
+
+ @Override
+ public byte[] serialize(FloatLongPair floatLongPair) {
+ return floatLongPair.toBytes();
+ }
+
+ @Override
+ public FloatLongPair deserialize(byte[] bytes) {
+ return FloatLongPair.fromBytes(bytes);
+ }
+
+ @Override
+ public FloatLongPair deserialize(ByteBuffer byteBuffer) {
+ return FloatLongPair.fromByteBuffer(byteBuffer);
+ }
+ };
+ public static final ObjectSerDe<DoubleLongPair> DOUBLE_LONG_PAIR_SER_DE
+ = new ObjectSerDe<DoubleLongPair>() {
+
+ @Override
+ public byte[] serialize(DoubleLongPair doubleLongPair) {
+ return doubleLongPair.toBytes();
+ }
+
+ @Override
+ public DoubleLongPair deserialize(byte[] bytes) {
+ return DoubleLongPair.fromBytes(bytes);
+ }
+
+ @Override
+ public DoubleLongPair deserialize(ByteBuffer byteBuffer) {
+ return DoubleLongPair.fromByteBuffer(byteBuffer);
+ }
+ };
+ public static final ObjectSerDe<StringLongPair> STRING_LONG_PAIR_SER_DE
+ = new ObjectSerDe<StringLongPair>() {
+
+ @Override
+ public byte[] serialize(StringLongPair stringLongPair) {
+ return stringLongPair.toBytes();
+ }
+
+ @Override
+ public StringLongPair deserialize(byte[] bytes) {
+ return StringLongPair.fromBytes(bytes);
+ }
+
+ @Override
+ public StringLongPair deserialize(ByteBuffer byteBuffer) {
+ return StringLongPair.fromByteBuffer(byteBuffer);
+ }
+ };
+
public static final ObjectSerDe<HyperLogLog> HYPER_LOG_LOG_SER_DE = new ObjectSerDe<HyperLogLog>() {
@Override
@@ -1047,7 +1160,12 @@ public class ObjectSerDeUtils {
INT_2_LONG_MAP_SER_DE,
LONG_2_LONG_MAP_SER_DE,
FLOAT_2_LONG_MAP_SER_DE,
- DOUBLE_2_LONG_MAP_SER_DE
+ DOUBLE_2_LONG_MAP_SER_DE,
+ INT_LONG_PAIR_SER_DE,
+ LONG_LONG_PAIR_SER_DE,
+ FLOAT_LONG_PAIR_SER_DE,
+ DOUBLE_LONG_PAIR_SER_DE,
+ STRING_LONG_PAIR_SER_DE
};
//@formatter:on
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 3285607..df26e16 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.exception.BadQueryRequestException;
@@ -156,6 +157,36 @@ public class AggregationFunctionFactory {
return new AvgAggregationFunction(firstArgument);
case MODE:
return new ModeAggregationFunction(arguments);
+ case LASTWITHTIME:
+ if (arguments.size() == 3) {
+ ExpressionContext timeCol = arguments.get(1);
+ ExpressionContext dataType = arguments.get(2);
+ if (dataType.getType() != ExpressionContext.Type.LITERAL) {
+ throw new IllegalArgumentException("Third argument of lastWithTime Function should be literal."
+ + " The function can be used as lastWithTime(dataColumn, timeColumn, 'dataType')");
+ }
+ FieldSpec.DataType fieldDataType
+ = FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase());
+ switch (fieldDataType) {
+ case BOOLEAN:
+ case INT:
+ return new LastIntValueWithTimeAggregationFunction(
+ firstArgument, timeCol, fieldDataType == FieldSpec.DataType.BOOLEAN);
+ case LONG:
+ return new LastLongValueWithTimeAggregationFunction(firstArgument, timeCol);
+ case FLOAT:
+ return new LastFloatValueWithTimeAggregationFunction(firstArgument, timeCol);
+ case DOUBLE:
+ return new LastDoubleValueWithTimeAggregationFunction(firstArgument, timeCol);
+ case STRING:
+ return new LastStringValueWithTimeAggregationFunction(firstArgument, timeCol);
+ default:
+ throw new IllegalArgumentException("Unsupported Value Type for lastWithTime Function:" + dataType);
+ }
+ } else {
+ throw new IllegalArgumentException("Three arguments are required for lastWithTime Function."
+ + " The function can be used as lastWithTime(dataColumn, timeColumn, 'dataType')");
+ }
case MINMAXRANGE:
return new MinMaxRangeAggregationFunction(firstArgument);
case DISTINCTCOUNT:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000..796a0cb
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+ private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR
+ = new DoubleLongPair(Double.NaN, Long.MIN_VALUE);
+
+ public LastDoubleValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol) {
+ super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+ }
+
+ @Override
+ public ValueLongPair<Double> constructValueLongPair(Double value, long time) {
+ return new DoubleLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<Double> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
+ Double lastData = defaultValueLongPair.getValue();
+ long lastTime = defaultValueLongPair.getTime();
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ double data = doubleValues[i];
+ long time = timeValues[i];
+ if (time >= lastTime) {
+ lastTime = time;
+ lastData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ double data = doubleValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ double value = doubleValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
+ }
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'DOUBLE')";
+ }
+
+ @Override
+ public String getColumnName() {
+ return getType().getName() + "_" + _expression + "_" + _timeCol + "_DOUBLE";
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.DOUBLE;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000..6061a83
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastFloatValueWithTimeAggregationFunction.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.FloatLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with float type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'float')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the float data column to be calculated last on</li>
+ * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastFloatValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Float> {
+
+ private final static ValueLongPair<Float> DEFAULT_VALUE_TIME_PAIR = new FloatLongPair(Float.NaN, Long.MIN_VALUE);
+
+ public LastFloatValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol) {
+ super(dataCol, timeCol, ObjectSerDeUtils.FLOAT_LONG_PAIR_SER_DE);
+ }
+
+ @Override
+ public ValueLongPair<Float> constructValueLongPair(Float value, long time) {
+ return new FloatLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<Float> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<Float> defaultValueLongPair = getDefaultValueTimePair();
+ Float lastData = defaultValueLongPair.getValue();
+ long lastTime = defaultValueLongPair.getTime();
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ float data = floatValues[i];
+ long time = timeValues[i];
+ if (time >= lastTime) {
+ lastTime = time;
+ lastData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length,
+ int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ float data = floatValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ float value = floatValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
+ }
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'FLOAT')";
+ }
+
+ @Override
+ public String getColumnName() {
+ return getType().getName() + "_" + _expression + "_" + _timeCol + "_FLOAT";
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.FLOAT;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000..ff9fdee
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastIntValueWithTimeAggregationFunction.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with int/boolean type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'int')
+ * or LastWithTime(dataExpression, timeExpression, 'boolean')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the int/boolean data column to be calculated last on</li>
+ * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastIntValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Integer> {
+
+ private final static ValueLongPair<Integer> DEFAULT_VALUE_TIME_PAIR
+ = new IntLongPair(Integer.MIN_VALUE, Long.MIN_VALUE);
+ private final boolean _isBoolean;
+
+ public LastIntValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol,
+ boolean isBoolean) {
+ super(dataCol, timeCol, ObjectSerDeUtils.INT_LONG_PAIR_SER_DE);
+ _isBoolean = isBoolean;
+ }
+
+ @Override
+ public ValueLongPair<Integer> constructValueLongPair(Integer value, long time) {
+ return new IntLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<Integer> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<Integer> defaultValueLongPair = getDefaultValueTimePair();
+ Integer lastData = defaultValueLongPair.getValue();
+ long lastTime = defaultValueLongPair.getTime();
+ int[] intValues = blockValSet.getIntValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ int data = intValues[i];
+ long time = timeValues[i];
+ if (time >= lastTime) {
+ lastTime = time;
+ lastData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ int[] intValues = blockValSet.getIntValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ int data = intValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ int[] intValues = blockValSet.getIntValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ int value = intValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
+ }
+ }
+
+ @Override
+ public String getResultColumnName() {
+ if (_isBoolean) {
+ return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'BOOLEAN')";
+ } else {
+ return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'INT')";
+ }
+ }
+
+ @Override
+ public String getColumnName() {
+ if (_isBoolean) {
+ return getType().getName() + "_" + _expression + "_" + _timeCol + "_BOOLEAN";
+ } else {
+ return getType().getName() + "_" + _expression + "_" + _timeCol + "_INT";
+ }
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ if (_isBoolean) {
+ return ColumnDataType.BOOLEAN;
+ } else {
+ return ColumnDataType.INT;
+ }
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000..fa3c15f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastLongValueWithTimeAggregationFunction.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LongLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with long type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'long')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the long data column to be calculated last on</li>
+ * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastLongValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Long> {
+
+ private final static ValueLongPair<Long> DEFAULT_VALUE_TIME_PAIR
+ = new LongLongPair(Long.MIN_VALUE, Long.MIN_VALUE);
+
+ public LastLongValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol) {
+ super(dataCol, timeCol, ObjectSerDeUtils.LONG_LONG_PAIR_SER_DE);
+ }
+
+ @Override
+ public ValueLongPair<Long> constructValueLongPair(Long value, long time) {
+ return new LongLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<Long> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<Long> defaultValueLongPair = getDefaultValueTimePair();
+ Long lastData = defaultValueLongPair.getValue();
+ long lastTime = defaultValueLongPair.getTime();
+ long[] longValues = blockValSet.getLongValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ long data = longValues[i];
+ long time = timeValues[i];
+ if (time >= lastTime) {
+ lastTime = time;
+ lastData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ long[] longValues = blockValSet.getLongValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ long data = longValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ long[] longValues = blockValSet.getLongValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ long value = longValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
+ }
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'LONG')";
+ }
+
+ @Override
+ public String getColumnName() {
+ return getType().getName() + "_" + _expression + "_" + _timeCol + "_LONG";
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.LONG;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
new file mode 100644
index 0000000..cb3caa8
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastStringValueWithTimeAggregationFunction.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with string type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'string')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the string data column to be calculated last on</li>
+ * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastStringValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<String> {
+ private final static ValueLongPair<String> DEFAULT_VALUE_TIME_PAIR = new StringLongPair("", Long.MIN_VALUE);
+
+ public LastStringValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol) {
+ super(dataCol, timeCol, ObjectSerDeUtils.STRING_LONG_PAIR_SER_DE);
+ }
+
+ @Override
+ public ValueLongPair<String> constructValueLongPair(String value, long time) {
+ return new StringLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<String> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<String> defaultValueLongPair = getDefaultValueTimePair();
+ String lastData = defaultValueLongPair.getValue();
+ long lastTime = defaultValueLongPair.getTime();
+ String[] stringValues = blockValSet.getStringValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ String data = stringValues[i];
+ long time = timeValues[i];
+ if (time >= lastTime) {
+ lastTime = time;
+ lastData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ String[] stringValues = blockValSet.getStringValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ String data = stringValues[i];
+ long time = timeValues[i];
+ setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+ }
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet) {
+ String[] stringValues = blockValSet.getStringValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ String value = stringValues[i];
+ long time = timeValues[i];
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, value, time);
+ }
+ }
+ }
+
+ @Override
+ public String getResultColumnName() {
+ return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'STRING')";
+ }
+
+ @Override
+ public String getColumnName() {
+ return getType().getName() + "_" + _expression + "_" + _timeCol + "_STRING";
+ }
+
+ @Override
+ public ColumnDataType getFinalResultColumnType() {
+ return ColumnDataType.STRING;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
new file mode 100644
index 0000000..adc91da
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * This function is used for LastWithTime calculations.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'dataType')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the column to be calculated last on</li>
+ * <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ * Numeric column</li>
+ * <li>dataType: the data type of data column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>>
+ extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+ protected final ExpressionContext _timeCol;
+ private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> _objectSerDe;
+
+ public LastWithTimeAggregationFunction(ExpressionContext dataCol,
+ ExpressionContext timeCol,
+ ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
+ super(dataCol);
+ _timeCol = timeCol;
+ _objectSerDe = objectSerDe;
+ }
+
+ public abstract ValueLongPair<V> constructValueLongPair(V value, long time);
+
+ public abstract ValueLongPair<V> getDefaultValueTimePair();
+
+ public abstract void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet);
+
+ public abstract void aggregateGroupResultWithRawDataSv(int length,
+ int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet);
+
+ public abstract void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet);
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.LASTWITHTIME;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+ if (blockValSet.getValueType() != DataType.BYTES) {
+ aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, blockTimeSet);
+ } else {
+ ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
+ V lastData = defaultValueLongPair.getValue();
+ long lastTime = defaultValueLongPair.getTime();
+ // Serialized LastPair
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+ V data = lastWithTimePair.getValue();
+ long time = lastWithTimePair.getTime();
+ if (time >= lastTime) {
+ lastTime = time;
+ lastData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ }
+ }
+
+ protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, V data, long time) {
+ ValueLongPair lastWithTimePair = aggregationResultHolder.getResult();
+ if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) {
+ aggregationResultHolder.setValue(constructValueLongPair(data, time));
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+ if (blockValSet.getValueType() != DataType.BYTES) {
+ aggregateGroupResultWithRawDataSv(length, groupKeyArray, groupByResultHolder,
+ blockValSet, timeValSet);
+ } else {
+ // Serialized LastPair
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+ setGroupByResult(groupKeyArray[i],
+ groupByResultHolder,
+ lastWithTimePair.getValue(),
+ lastWithTimePair.getTime());
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+ if (blockValSet.getValueType() != DataType.BYTES) {
+ aggregateGroupResultWithRawDataMv(length, groupKeysArray, groupByResultHolder, blockValSet, timeValSet);
+ } else {
+ // Serialized ValueTimePair
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+ V data = lastWithTimePair.getValue();
+ long time = lastWithTimePair.getTime();
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, data, time);
+ }
+ }
+ }
+ }
+
+ protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, V data, long time) {
+ ValueLongPair lastWithTimePair = groupByResultHolder.getResult(groupKey);
+ if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) {
+ groupByResultHolder.setValueForKey(groupKey, constructValueLongPair(data, time));
+ }
+ }
+
+ @Override
+ public ValueLongPair<V> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+ ValueLongPair lastWithTimePair = aggregationResultHolder.getResult();
+ if (lastWithTimePair == null) {
+ return getDefaultValueTimePair();
+ } else {
+ return lastWithTimePair;
+ }
+ }
+
+ @Override
+ public ValueLongPair<V> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+ ValueLongPair<V> lastWithTimePair = groupByResultHolder.getResult(groupKey);
+ if (lastWithTimePair == null) {
+ return getDefaultValueTimePair();
+ } else {
+ return lastWithTimePair;
+ }
+ }
+
+ @Override
+ public ValueLongPair<V> merge(ValueLongPair<V> intermediateResult1, ValueLongPair<V> intermediateResult2) {
+ if (intermediateResult1.getTime() >= intermediateResult2.getTime()) {
+ return intermediateResult1;
+ } else {
+ return intermediateResult2;
+ }
+ }
+
+ @Override
+ public List<ExpressionContext> getInputExpressions() {
+ return Arrays.asList(_expression, _timeCol);
+ }
+
+ @Override
+ public boolean isIntermediateResultComparable() {
+ return false;
+ }
+
+ @Override
+ public ColumnDataType getIntermediateResultColumnType() {
+ return ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public V extractFinalResult(ValueLongPair<V> intermediateResult) {
+ return intermediateResult.getValue();
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
index 8eea09b..e030f4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
@@ -164,6 +164,13 @@ public class BrokerRequestToQueryContextConverter {
for (String expression : stringExpressions) {
arguments.add(RequestContextUtils.getExpressionFromPQL(expression));
}
+ } else if (functionName.equalsIgnoreCase(AggregationFunctionType.LASTWITHTIME.getName())) {
+ // For LASTWITHTIME query, only the first two arguments are expression, third one is literal if available
+ arguments.add(RequestContextUtils.getExpressionFromPQL(stringExpressions.get(0)));
+ arguments.add(RequestContextUtils.getExpressionFromPQL(stringExpressions.get(1)));
+ for (int i = 2; i < numArguments; i++) {
+ arguments.add(ExpressionContext.forLiteral(stringExpressions.get(i)));
+ }
} else {
// For non-DISTINCT query, only the first argument is expression, others are literals
// NOTE: We directly use the string as the literal value because of the legacy behavior of PQL compiler
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
index 8e4c6df..3b8bfc4 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
@@ -34,8 +34,14 @@ import org.apache.commons.lang.RandomStringUtils;
import org.apache.pinot.core.query.aggregation.function.PercentileEstAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction;
import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.FloatLongPair;
+import org.apache.pinot.segment.local.customobject.IntLongPair;
+import org.apache.pinot.segment.local.customobject.LongLongPair;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
+import org.apache.pinot.segment.local.customobject.StringLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -127,6 +133,76 @@ public class ObjectSerDeUtilsTest {
}
@Test
+ public void testIntValueTimePair() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ ValueLongPair<Integer> expected = new IntLongPair(RANDOM.nextInt(), RANDOM.nextLong());
+
+ byte[] bytes = ObjectSerDeUtils.serialize(expected);
+ ValueLongPair<Integer> actual = ObjectSerDeUtils.deserialize(bytes,
+ ObjectSerDeUtils.ObjectType.IntLongPair);
+
+ assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE);
+ assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testLongValueTimePair() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ ValueLongPair<Long> expected = new LongLongPair(RANDOM.nextLong(), RANDOM.nextLong());
+
+ byte[] bytes = ObjectSerDeUtils.serialize(expected);
+ ValueLongPair<Long> actual = ObjectSerDeUtils.deserialize(bytes,
+ ObjectSerDeUtils.ObjectType.LongLongPair);
+
+ assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE);
+ assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testFloatValueTimePair() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ ValueLongPair<Float> expected = new FloatLongPair(RANDOM.nextFloat(), RANDOM.nextLong());
+
+ byte[] bytes = ObjectSerDeUtils.serialize(expected);
+ ValueLongPair<Float> actual = ObjectSerDeUtils.deserialize(bytes, ObjectSerDeUtils.ObjectType.FloatLongPair);
+
+ assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE);
+ assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testDoubleValueTimePair() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ ValueLongPair<Double> expected = new DoubleLongPair(RANDOM.nextDouble(), RANDOM.nextLong());
+
+ byte[] bytes = ObjectSerDeUtils.serialize(expected);
+ ValueLongPair<Double> actual = ObjectSerDeUtils.deserialize(bytes,
+ ObjectSerDeUtils.ObjectType.DoubleLongPair);
+
+ assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE);
+ assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testStringValueTimePair() {
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ ValueLongPair<String> expected = new StringLongPair(String.valueOf(RANDOM.nextDouble()), RANDOM.nextLong());
+
+ String temp = new String(expected.getValue().getBytes());
+ byte[] bytes = ObjectSerDeUtils.serialize(expected);
+ ValueLongPair<String> actual = ObjectSerDeUtils.deserialize(bytes,
+ ObjectSerDeUtils.ObjectType.StringLongPair);
+
+ assertEquals(actual.getValue(), expected.getValue(), ERROR_MESSAGE);
+ assertEquals(actual.getTime(), expected.getTime(), ERROR_MESSAGE);
+ }
+ }
+
+ @Test
public void testHyperLogLog() {
for (int i = 0; i < NUM_ITERATIONS; i++) {
HyperLogLog expected = new HyperLogLog(7);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
index e5e0e22..9c730f1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java
@@ -87,6 +87,48 @@ public class AggregationFunctionFactoryTest {
assertEquals(aggregationFunction.getColumnName(), "mode_column");
assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+ function = getFunction("LaStWiThTiMe", "(column,timeColumn,'BOOLEAN')");
+ aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof LastIntValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_BOOLEAN");
+ assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
+ function = getFunction("LaStWiThTiMe", "(column,timeColumn,'INT')");
+ aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof LastIntValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_INT");
+ assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
+ function = getFunction("LaStWiThTiMe", "(column,timeColumn,'LONG')");
+ aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof LastLongValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_LONG");
+ assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
+ function = getFunction("LaStWiThTiMe", "(column,timeColumn,'FLOAT')");
+ aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof LastFloatValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_FLOAT");
+ assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
+ function = getFunction("LaStWiThTiMe", "(column,timeColumn,'DOUBLE')");
+ aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof LastDoubleValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_DOUBLE");
+ assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
+ function = getFunction("LaStWiThTiMe", "(column,timeColumn,'STRING')");
+ aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
+ assertTrue(aggregationFunction instanceof LastStringValueWithTimeAggregationFunction);
+ assertEquals(aggregationFunction.getType(), AggregationFunctionType.LASTWITHTIME);
+ assertEquals(aggregationFunction.getColumnName(), "lastWithTime_column_timeColumn_STRING");
+ assertEquals(aggregationFunction.getResultColumnName(), function.toString());
+
function = getFunction("MiNmAxRaNgE");
aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof MinMaxRangeAggregationFunction);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/LastWithTimeQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/LastWithTimeQueriesTest.java
new file mode 100644
index 0000000..75d78cc
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/LastWithTimeQueriesTest.java
@@ -0,0 +1,548 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.GroupByResult;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Queries test for LASTWITHTIME queries.
+ */
+@SuppressWarnings("rawtypes")
+public class LastWithTimeQueriesTest extends BaseQueriesTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "LastQueriesTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
+ private static final Random RANDOM = new Random();
+
+ private static final int NUM_RECORDS = 2000;
+ private static final int MAX_VALUE = 1000;
+
+ private static final String BOOL_COLUMN = "boolColumn";
+ private static final String BOOL_NO_DICT_COLUMN = "boolNoDictColumn";
+ private static final String INT_COLUMN = "intColumn";
+ private static final String INT_MV_COLUMN = "intMvColumn";
+ private static final String INT_NO_DICT_COLUMN = "intNoDictColumn";
+ private static final String LONG_COLUMN = "longColumn";
+ private static final String LONG_MV_COLUMN = "longMvColumn";
+ private static final String LONG_NO_DICT_COLUMN = "longNoDictColumn";
+ private static final String FLOAT_COLUMN = "floatColumn";
+ private static final String FLOAT_MV_COLUMN = "floatMvColumn";
+ private static final String FLOAT_NO_DICT_COLUMN = "floatNoDictColumn";
+ private static final String DOUBLE_COLUMN = "doubleColumn";
+ private static final String DOUBLE_MV_COLUMN = "doubleMvColumn";
+ private static final String DOUBLE_NO_DICT_COLUMN = "doubleNoDictColumn";
+ private static final String STRING_COLUMN = "stringColumn";
+ private static final String STRING_MV_COLUMN = "stringMvColumn";
+ private static final String STRING_NO_DICT_COLUMN = "stringNoDictColumn";
+ private static final String TIME_COLUMN = "timestampColumn";
+ private static final Schema SCHEMA = new Schema.SchemaBuilder()
+ .addSingleValueDimension(BOOL_COLUMN, DataType.BOOLEAN)
+ .addSingleValueDimension(BOOL_NO_DICT_COLUMN, DataType.BOOLEAN)
+ .addSingleValueDimension(INT_COLUMN, DataType.INT)
+ .addMultiValueDimension(INT_MV_COLUMN, DataType.INT)
+ .addSingleValueDimension(INT_NO_DICT_COLUMN, DataType.INT)
+ .addSingleValueDimension(LONG_COLUMN, DataType.LONG)
+ .addMultiValueDimension(LONG_MV_COLUMN, DataType.LONG)
+ .addSingleValueDimension(LONG_NO_DICT_COLUMN, DataType.LONG)
+ .addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+ .addMultiValueDimension(FLOAT_MV_COLUMN, DataType.FLOAT)
+ .addSingleValueDimension(FLOAT_NO_DICT_COLUMN, DataType.FLOAT)
+ .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE)
+ .addMultiValueDimension(DOUBLE_MV_COLUMN, DataType.DOUBLE)
+ .addSingleValueDimension(DOUBLE_NO_DICT_COLUMN, DataType.DOUBLE)
+ .addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+ .addMultiValueDimension(STRING_MV_COLUMN, DataType.STRING)
+ .addSingleValueDimension(STRING_NO_DICT_COLUMN, DataType.STRING)
+ .addSingleValueDimension(TIME_COLUMN, DataType.LONG).build();
+ private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNoDictionaryColumns(
+ Lists.newArrayList(INT_NO_DICT_COLUMN, LONG_NO_DICT_COLUMN, FLOAT_NO_DICT_COLUMN, DOUBLE_NO_DICT_COLUMN))
+ .build();
+ private static final double DELTA = 0.00001;
+
+ private Boolean _expectedResultLastBoolean;
+ private Integer _expectedResultLastInt;
+ private Long _expectedResultLastLong;
+ private Float _expectedResultLastFloat;
+ private Double _expectedResultLastDouble;
+ private String _expectedResultLastString;
+ private Map<Integer, Boolean> _boolGroupValues;
+ private Map<Integer, Integer> _intGroupValues;
+ private Map<Integer, Long> _longGroupValues;
+ private Map<Integer, Float> _floatGroupValues;
+ private Map<Integer, Double> _doubleGroupValues;
+ private Map<Integer, String> _stringGroupValues;
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+
+ @Override
+ protected String getFilter() {
+ // NOTE: Use a match all filter to switch between DictionaryBasedAggregationOperator and AggregationOperator
+ return " WHERE intColumn >= 0";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+
+ List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+ int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE);
+ _boolGroupValues = new HashMap<>();
+ _intGroupValues = new HashMap<>();
+ _longGroupValues = new HashMap<>();
+ _floatGroupValues = new HashMap<>();
+ _doubleGroupValues = new HashMap<>();
+ _stringGroupValues = new HashMap<>();
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ boolean boolValue = RANDOM.nextBoolean();
+ int intValue = RANDOM.nextInt(MAX_VALUE);
+ long longValue = RANDOM.nextLong();
+ float floatValue = RANDOM.nextFloat();
+ double doubleValue = RANDOM.nextDouble();
+ String strValue = String.valueOf(RANDOM.nextDouble());
+ GenericRow record = new GenericRow();
+ record.putValue(BOOL_COLUMN, boolValue);
+ record.putValue(BOOL_NO_DICT_COLUMN, boolValue);
+ record.putValue(INT_COLUMN, intValue);
+ record.putValue(INT_MV_COLUMN, new Integer[]{intValue, intValue});
+ record.putValue(INT_NO_DICT_COLUMN, intValue);
+ record.putValue(LONG_COLUMN, longValue);
+ record.putValue(LONG_MV_COLUMN, new Long[]{longValue, longValue});
+ record.putValue(LONG_NO_DICT_COLUMN, longValue);
+ record.putValue(FLOAT_COLUMN, floatValue);
+ record.putValue(FLOAT_MV_COLUMN, new Float[]{floatValue, floatValue});
+ record.putValue(FLOAT_NO_DICT_COLUMN, floatValue);
+ record.putValue(DOUBLE_COLUMN, doubleValue);
+ record.putValue(DOUBLE_MV_COLUMN, new Double[]{doubleValue, doubleValue});
+ record.putValue(DOUBLE_NO_DICT_COLUMN, doubleValue);
+ record.putValue(STRING_COLUMN, strValue);
+ record.putValue(STRING_MV_COLUMN, new String[]{strValue, strValue});
+ record.putValue(STRING_NO_DICT_COLUMN, strValue);
+ record.putValue(TIME_COLUMN, (long) i);
+ if (i == NUM_RECORDS - 1) {
+ _expectedResultLastBoolean = boolValue;
+ _expectedResultLastInt = intValue;
+ _expectedResultLastLong = longValue;
+ _expectedResultLastFloat = floatValue;
+ _expectedResultLastDouble = doubleValue;
+ _expectedResultLastString = strValue;
+ }
+ _boolGroupValues.put(intValue, boolValue);
+ _intGroupValues.put(intValue, intValue);
+ _longGroupValues.put(intValue, longValue);
+ _floatGroupValues.put(intValue, floatValue);
+ _doubleGroupValues.put(intValue, doubleValue);
+ _stringGroupValues.put(intValue, strValue);
+ records.add(record);
+ }
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+ driver.build();
+
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+ }
+
+ @Test
+ public void testAggregationOnly() {
+ String query = "SELECT LASTWITHTIME(boolColumn,timestampColumn, BOOLEAN),"
+ + " LASTWITHTIME(intColumn,timestampColumn, Int),"
+ + " LASTWITHTIME(longColumn,timestampColumn, Long),"
+ + " LASTWITHTIME(floatColumn,timestampColumn, Float),"
+ + " LASTWITHTIME(doubleColumn,timestampColumn, Double),"
+ + " LASTWITHTIME(stringColumn,timestampColumn, String) FROM testTable";
+
+ // Inner segment
+ Operator operator = getOperatorForPqlQuery(query);
+ assertTrue(operator instanceof AggregationOperator);
+ IntermediateResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock();
+ QueriesTestUtils
+ .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS,
+ NUM_RECORDS);
+ List<Object> aggregationResultsWithoutFilter = resultsBlock.getAggregationResult();
+
+ operator = getOperatorForPqlQueryWithFilter(query);
+ assertTrue(operator instanceof AggregationOperator);
+ IntermediateResultsBlock resultsBlockWithFilter = ((AggregationOperator) operator).nextBlock();
+ QueriesTestUtils
+ .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS,
+ NUM_RECORDS);
+ List<Object> aggregationResultWithFilter = resultsBlockWithFilter.getAggregationResult();
+
+ assertNotNull(aggregationResultsWithoutFilter);
+ assertNotNull(aggregationResultWithFilter);
+ assertEquals(aggregationResultsWithoutFilter.size(), aggregationResultWithFilter.size());
+ for (int i = 0; i < aggregationResultsWithoutFilter.size(); i++) {
+ assertTrue(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(i)).compareTo(
+ (ValueLongPair<Integer>) aggregationResultWithFilter.get(i)) == 0);
+ }
+ assertEquals((((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(0))).getValue() != 0,
+ _expectedResultLastBoolean.booleanValue());
+ assertEquals(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(1)).getValue(), _expectedResultLastInt);
+ assertEquals(((ValueLongPair<Long>) aggregationResultsWithoutFilter.get(2)).getValue(), _expectedResultLastLong);
+ assertEquals(((ValueLongPair<Float>) aggregationResultsWithoutFilter.get(3)).getValue(), _expectedResultLastFloat);
+ assertEquals(((ValueLongPair<Double>) aggregationResultsWithoutFilter.get(4)).getValue(),
+ _expectedResultLastDouble);
+ assertEquals(((ValueLongPair<String>) aggregationResultsWithoutFilter.get(5)).getValue(),
+ _expectedResultLastString);
+
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+
+ Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+ List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
+ Assert.assertEquals(aggregationResults.size(), 6);
+ Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()),
+ _expectedResultLastBoolean.booleanValue());
+ Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()),
+ _expectedResultLastInt.intValue());
+ Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()),
+ _expectedResultLastLong.longValue());
+ Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()),
+ _expectedResultLastFloat.floatValue(), DELTA);
+ Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()),
+ _expectedResultLastDouble.doubleValue(), DELTA);
+ Assert.assertEquals(aggregationResults.get(5).getValue().toString(),
+ _expectedResultLastString);
+
+ brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
+ Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+ aggregationResults = brokerResponse.getAggregationResults();
+ Assert.assertEquals(aggregationResults.size(), 6);
+ Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()),
+ _expectedResultLastBoolean.booleanValue());
+ Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()),
+ _expectedResultLastInt.intValue());
+ Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()),
+ _expectedResultLastLong.longValue());
+ Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()),
+ _expectedResultLastFloat.floatValue(), DELTA);
+ Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()),
+ _expectedResultLastDouble.doubleValue(), DELTA);
+ Assert.assertEquals(aggregationResults.get(5).getValue().toString(),
+ _expectedResultLastString);
+ }
+
+ @Test
+ public void testAggregationOnlyNoDictionary() {
+ String query =
+ "SELECT LASTWITHTIME(boolNoDictColumn,timestampColumn,boolean),"
+ + " LASTWITHTIME(intNoDictColumn,timestampColumn,int),"
+ + " LASTWITHTIME(longNoDictColumn,timestampColumn,long),"
+ + " LASTWITHTIME(floatNoDictColumn,timestampColumn,float),"
+ + " LASTWITHTIME(doubleNoDictColumn,timestampColumn,double),"
+ + " LASTWITHTIME(stringNoDictColumn,timestampColumn,string) FROM testTable";
+
+ // Inner segment
+ Operator operator = getOperatorForPqlQuery(query);
+ assertTrue(operator instanceof AggregationOperator);
+ IntermediateResultsBlock resultsBlock = ((AggregationOperator) operator).nextBlock();
+ QueriesTestUtils
+ .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS,
+ NUM_RECORDS);
+ List<Object> aggregationResultsWithoutFilter = resultsBlock.getAggregationResult();
+
+ operator = getOperatorForPqlQueryWithFilter(query);
+ assertTrue(operator instanceof AggregationOperator);
+ IntermediateResultsBlock resultsBlockWithFilter = ((AggregationOperator) operator).nextBlock();
+ QueriesTestUtils
+ .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(), NUM_RECORDS, 0, 7 * NUM_RECORDS,
+ NUM_RECORDS);
+ List<Object> aggregationResultWithFilter = resultsBlockWithFilter.getAggregationResult();
+
+ assertNotNull(aggregationResultsWithoutFilter);
+ assertNotNull(aggregationResultWithFilter);
+ assertEquals(aggregationResultsWithoutFilter.size(), aggregationResultWithFilter.size());
+ for (int i = 0; i < aggregationResultsWithoutFilter.size(); i++) {
+ assertTrue(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(i)).compareTo(
+ (ValueLongPair<Integer>) aggregationResultWithFilter.get(i)) == 0);
+ }
+
+ assertEquals(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(0)).getValue() != 0,
+ _expectedResultLastBoolean.booleanValue());
+ assertEquals(((ValueLongPair<Integer>) aggregationResultsWithoutFilter.get(1)).getValue(), _expectedResultLastInt);
+ assertEquals(((ValueLongPair<Long>) aggregationResultsWithoutFilter.get(2)).getValue(), _expectedResultLastLong);
+ assertEquals(((ValueLongPair<Float>) aggregationResultsWithoutFilter.get(3)).getValue(), _expectedResultLastFloat);
+ assertEquals(((ValueLongPair<Double>) aggregationResultsWithoutFilter.get(4)).getValue(),
+ _expectedResultLastDouble);
+ assertEquals(((ValueLongPair<String>) aggregationResultsWithoutFilter.get(5)).getValue(),
+ _expectedResultLastString);
+
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+
+ Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+ List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
+ Assert.assertEquals(aggregationResults.size(), 6);
+ Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()),
+ _expectedResultLastBoolean.booleanValue());
+ Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()),
+ _expectedResultLastInt.intValue());
+ Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()),
+ _expectedResultLastLong.longValue());
+ Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()),
+ _expectedResultLastFloat, DELTA);
+ Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()),
+ _expectedResultLastDouble, DELTA);
+ Assert.assertEquals(aggregationResults.get(5).getValue().toString(),
+ _expectedResultLastString);
+
+ brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
+ Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 7 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+ aggregationResults = brokerResponse.getAggregationResults();
+ Assert.assertEquals(aggregationResults.size(), 6);
+ Assert.assertEquals(Boolean.parseBoolean(aggregationResults.get(0).getValue().toString()),
+ _expectedResultLastBoolean.booleanValue());
+ Assert.assertEquals(Integer.parseInt(aggregationResults.get(1).getValue().toString()),
+ _expectedResultLastInt.intValue());
+ Assert.assertEquals(Long.parseLong(aggregationResults.get(2).getValue().toString()),
+ _expectedResultLastLong.longValue());
+ Assert.assertEquals(Float.parseFloat(aggregationResults.get(3).getValue().toString()),
+ _expectedResultLastFloat, DELTA);
+ Assert.assertEquals(Double.parseDouble(aggregationResults.get(4).getValue().toString()),
+ _expectedResultLastDouble, DELTA);
+ Assert.assertEquals(aggregationResults.get(5).getValue().toString(),
+ _expectedResultLastString);
+ }
+
+ @Test
+ public void testAggregationGroupBySv() {
+ String query =
+ "SELECT LASTWITHTIME(boolColumn,timestampColumn,boolean),"
+ + " LASTWITHTIME(intColumn,timestampColumn,int),"
+ + " LASTWITHTIME(longColumn,timestampColumn,long),"
+ + " LASTWITHTIME(floatColumn,timestampColumn,float),"
+ + " LASTWITHTIME(doubleColumn,timestampColumn,double),"
+ + " LASTWITHTIME(stringColumn,timestampColumn,string) FROM testTable GROUP BY intColumn";
+
+ verifyAggregationResultsFromInnerSegments(query, 7);
+
+ verifyAggregationResultsFromInterSegments(query, 7);
+ }
+
+ @Test
+ public void testAggregationGroupBySvNoDictionary() {
+ String query =
+ "SELECT LASTWITHTIME(boolNoDictColumn,timestampColumn,boolean),"
+ + " LASTWITHTIME(intNoDictColumn,timestampColumn,int),"
+ + " LASTWITHTIME(longNoDictColumn,timestampColumn,long),"
+ + " LASTWITHTIME(floatNoDictColumn,timestampColumn,float),"
+ + " LASTWITHTIME(doubleNoDictColumn,timestampColumn,double),"
+ + " LASTWITHTIME(stringNoDictColumn,timestampColumn,string)"
+ + " FROM testTable GROUP BY intNoDictColumn";
+
+ verifyAggregationResultsFromInnerSegments(query, 7);
+
+ verifyAggregationResultsFromInterSegments(query, 7);
+ }
+
+ @Test
+ public void testAggregationGroupByMv() {
+ String query =
+ "SELECT LASTWITHTIME(boolColumn,timestampColumn,boolean),"
+ + " LASTWITHTIME(intColumn,timestampColumn,int),"
+ + " LASTWITHTIME(longColumn,timestampColumn,long),"
+ + " LASTWITHTIME(floatColumn,timestampColumn,float),"
+ + " LASTWITHTIME(doubleColumn,timestampColumn,double),"
+ + " LASTWITHTIME(stringColumn,timestampColumn,string) FROM testTable GROUP BY intMvColumn";
+
+ verifyAggregationResultsFromInnerSegments(query, 8);
+
+ verifyAggregationResultsFromInterSegments(query, 8);
+ }
+
+ @Test
+ public void testAggregationGroupByMvNoDictionary() {
+ String query =
+ "SELECT LASTWITHTIME(boolNoDictColumn,timestampColumn,boolean),"
+ + " LASTWITHTIME(intNoDictColumn,timestampColumn,int),"
+ + " LASTWITHTIME(longNoDictColumn,timestampColumn,long),"
+ + " LASTWITHTIME(floatNoDictColumn,timestampColumn,float),"
+ + " LASTWITHTIME(doubleNoDictColumn,timestampColumn,double),"
+ + " LASTWITHTIME(stringNoDictColumn,timestampColumn,string) FROM testTable GROUP BY intMvColumn";
+
+ verifyAggregationResultsFromInnerSegments(query, 8);
+
+ verifyAggregationResultsFromInterSegments(query, 8);
+ }
+
+ private void verifyAggregationResultsFromInnerSegments(String query, int numOfColumns) {
+ // Inner segment
+ Operator operator = getOperatorForPqlQuery(query);
+ assertTrue(operator instanceof AggregationGroupByOperator);
+ IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator) operator).nextBlock();
+ QueriesTestUtils
+ .testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
+ NUM_RECORDS,
+ 0,
+ numOfColumns * NUM_RECORDS,
+ NUM_RECORDS);
+ AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult();
+ assertNotNull(aggregationGroupByResult);
+ int numGroups = 0;
+ Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
+ while (groupKeyIterator.hasNext()) {
+ numGroups++;
+ GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+ Integer key = (Integer) groupKey._keys[0];
+ assertTrue(_intGroupValues.containsKey(key));
+ assertEquals(
+ ((ValueLongPair<Integer>) aggregationGroupByResult.getResultForGroupId(0, groupKey._groupId)).getValue()
+ != 0,
+ _boolGroupValues.get(key).booleanValue());
+ assertEquals(
+ ((ValueLongPair<Integer>) aggregationGroupByResult.getResultForGroupId(1, groupKey._groupId)).getValue(),
+ _intGroupValues.get(key));
+ assertEquals(
+ ((ValueLongPair<Long>) aggregationGroupByResult.getResultForGroupId(2, groupKey._groupId)).getValue(),
+ _longGroupValues.get(key));
+ assertEquals(
+ ((ValueLongPair<Float>) aggregationGroupByResult.getResultForGroupId(3, groupKey._groupId)).getValue(),
+ _floatGroupValues.get(key));
+ assertEquals(
+ ((ValueLongPair<Double>) aggregationGroupByResult.getResultForGroupId(4, groupKey._groupId)).getValue(),
+ _doubleGroupValues.get(key));
+ assertEquals(
+ ((ValueLongPair<String>) aggregationGroupByResult.getResultForGroupId(5, groupKey._groupId)).getValue(),
+ _stringGroupValues.get(key));
+ }
+ assertEquals(numGroups, _intGroupValues.size());
+ }
+
+ private void verifyAggregationResultsFromInterSegments(String query, int numOfColumns) {
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ // Inter segments (expect 4 * inner segment result)
+ Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * numOfColumns * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+
+ List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
+ Assert.assertEquals(aggregationResults.size(), 6);
+ Assert.assertNull(aggregationResults.get(0).getValue());
+ for (GroupByResult intGroupByResult : aggregationResults.get(1).getGroupByResult()) {
+ assertEquals(intGroupByResult.getGroup().size(), 1);
+ assertTrue(_intGroupValues.containsKey(Integer.parseInt(intGroupByResult.getGroup().get(0))));
+ assertEquals(Integer.parseInt(intGroupByResult.getValue().toString()),
+ _intGroupValues.get(Integer.parseInt(intGroupByResult.getGroup().get(0))).intValue());
+ }
+
+ Assert.assertNull(aggregationResults.get(1).getValue());
+ for (GroupByResult longGroupByResult : aggregationResults.get(2).getGroupByResult()) {
+ assertEquals(longGroupByResult.getGroup().size(), 1);
+ assertTrue(_longGroupValues.containsKey(Integer.parseInt(longGroupByResult.getGroup().get(0))));
+ assertEquals(Long.parseLong(longGroupByResult.getValue().toString()),
+ _longGroupValues.get(Integer.parseInt(longGroupByResult.getGroup().get(0))), DELTA);
+ }
+
+ Assert.assertNull(aggregationResults.get(2).getValue());
+ for (GroupByResult floatGroupByResult : aggregationResults.get(3).getGroupByResult()) {
+ assertEquals(floatGroupByResult.getGroup().size(), 1);
+ assertTrue(_floatGroupValues.containsKey(Integer.parseInt(floatGroupByResult.getGroup().get(0))));
+ assertEquals(Double.parseDouble(floatGroupByResult.getValue().toString()),
+ _floatGroupValues.get(Integer.parseInt(floatGroupByResult.getGroup().get(0))), DELTA);
+ }
+
+ Assert.assertNull(aggregationResults.get(3).getValue());
+ for (GroupByResult doubleGroupByResult : aggregationResults.get(4).getGroupByResult()) {
+ assertEquals(doubleGroupByResult.getGroup().size(), 1);
+ assertTrue(_doubleGroupValues.containsKey(Integer.parseInt(doubleGroupByResult.getGroup().get(0))));
+ assertEquals(Double.parseDouble(doubleGroupByResult.getValue().toString()),
+ _doubleGroupValues.get(Integer.parseInt(doubleGroupByResult.getGroup().get(0))), DELTA);
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ _indexSegment.destroy();
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/DoubleLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/DoubleLongPair.java
new file mode 100644
index 0000000..890a00d
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/DoubleLongPair.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+
+
+public class DoubleLongPair extends ValueLongPair<Double> {
+
+ public DoubleLongPair(Double value, long time) {
+ super(value, time);
+ }
+
+ public static DoubleLongPair fromBytes(byte[] bytes) {
+ return fromByteBuffer(ByteBuffer.wrap(bytes));
+ }
+
+ public static DoubleLongPair fromByteBuffer(ByteBuffer byteBuffer) {
+ return new DoubleLongPair(byteBuffer.getDouble(), byteBuffer.getLong());
+ }
+
+ @Override
+ public byte[] toBytes() {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Double.BYTES + Long.BYTES);
+ byteBuffer.putDouble(_value);
+ byteBuffer.putLong(_time);
+ return byteBuffer.array();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/FloatLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/FloatLongPair.java
new file mode 100644
index 0000000..94fcb31
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/FloatLongPair.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+
+
+public class FloatLongPair extends ValueLongPair<Float> {
+
+ public FloatLongPair(Float value, long time) {
+ super(value, time);
+ }
+
+ public static FloatLongPair fromBytes(byte[] bytes) {
+ return fromByteBuffer(ByteBuffer.wrap(bytes));
+ }
+
+ public static FloatLongPair fromByteBuffer(ByteBuffer byteBuffer) {
+ return new FloatLongPair(byteBuffer.getFloat(), byteBuffer.getLong());
+ }
+
+ @Override
+ public byte[] toBytes() {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Float.BYTES + Long.BYTES);
+ byteBuffer.putFloat(_value);
+ byteBuffer.putLong(_time);
+ return byteBuffer.array();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/IntLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/IntLongPair.java
new file mode 100644
index 0000000..191a931
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/IntLongPair.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+
+
+public class IntLongPair extends ValueLongPair<Integer> {
+
+ public IntLongPair(Integer value, long time) {
+ super(value, time);
+ }
+
+ public static IntLongPair fromBytes(byte[] bytes) {
+ return fromByteBuffer(ByteBuffer.wrap(bytes));
+ }
+
+ public static IntLongPair fromByteBuffer(ByteBuffer byteBuffer) {
+ return new IntLongPair(byteBuffer.getInt(), byteBuffer.getLong());
+ }
+
+ @Override
+ public byte[] toBytes() {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES + Long.BYTES);
+ byteBuffer.putInt(_value);
+ byteBuffer.putLong(_time);
+ return byteBuffer.array();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LongLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LongLongPair.java
new file mode 100644
index 0000000..0ffa345
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LongLongPair.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+
+
+public class LongLongPair extends ValueLongPair<Long> {
+
+ public LongLongPair(Long value, long time) {
+ super(value, time);
+ }
+
+ public static LongLongPair fromBytes(byte[] bytes) {
+ return fromByteBuffer(ByteBuffer.wrap(bytes));
+ }
+
+ public static LongLongPair fromByteBuffer(ByteBuffer byteBuffer) {
+ return new LongLongPair(byteBuffer.getLong(), byteBuffer.getLong());
+ }
+
+ @Override
+ public byte[] toBytes() {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Long.BYTES + Long.BYTES);
+ byteBuffer.putLong(_value);
+ byteBuffer.putLong(_time);
+ return byteBuffer.array();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/StringLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/StringLongPair.java
new file mode 100644
index 0000000..bfcdeae
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/StringLongPair.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+
+
+public class StringLongPair extends ValueLongPair<String> {
+
+ public StringLongPair(String value, long time) {
+ super(value, time);
+ }
+
+ public static StringLongPair fromBytes(byte[] bytes) {
+ return fromByteBuffer(ByteBuffer.wrap(bytes));
+ }
+
+ public static StringLongPair fromByteBuffer(ByteBuffer byteBuffer) {
+ int len = byteBuffer.getInt();
+ byte[] bytes = new byte[len];
+ byteBuffer.get(bytes);
+ return new StringLongPair(new String(bytes), byteBuffer.getLong());
+ }
+
+ @Override
+ public byte[] toBytes() {
+ int len = _value.length();
+ ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES + len + Long.BYTES);
+ byteBuffer.putInt(len);
+ byteBuffer.put(_value.getBytes());
+ byteBuffer.putLong(_time);
+ return byteBuffer.array();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
new file mode 100644
index 0000000..81e2ded
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueLongPair.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+public abstract class ValueLongPair<V extends Comparable<V>> implements Comparable<ValueLongPair<V>> {
+ protected V _value;
+ protected long _time;
+
+ public ValueLongPair(V value, long time) {
+ _value = value;
+ _time = time;
+ }
+
+ public V getValue() {
+ return _value;
+ }
+
+ public long getTime() {
+ return _time;
+ }
+
+ abstract public byte[] toBytes();
+
+ @Override
+ public int compareTo(ValueLongPair<V> o) {
+ if (_time < o.getTime()) {
+ return -1;
+ } else if (_time > o.getTime()) {
+ return 1;
+ } else {
+ return _value.compareTo(o.getValue());
+ }
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 197239b..53e5eba 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -33,6 +33,7 @@ public enum AggregationFunctionType {
SUMPRECISION("sumPrecision"),
AVG("avg"),
MODE("mode"),
+ LASTWITHTIME("lastWithTime"),
MINMAXRANGE("minMaxRange"),
DISTINCTCOUNT("distinctCount"),
DISTINCTCOUNTBITMAP("distinctCountBitmap"),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org