You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/10/17 00:38:53 UTC

[GitHub] [pinot] weixiangsun opened a new pull request #7584: Add LASTWITHTIME aggregate function support #7315

weixiangsun opened a new pull request #7584:
URL: https://github.com/apache/pinot/pull/7584


   ## Description
   Adding aggregate function to return last value of time-based data set.
   ## Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   * [ ] Yes (Please label as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR fix a zero-downtime upgrade introduced earlier?
   * [ ] Yes (Please label this as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR otherwise need attention when creating release notes? Things to consider:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   * [ ] Yes (Please label this PR as **<code>release-notes</code>** and complete the section on Release Notes)
   ## Release Notes
   <!-- If you have tagged this as either backward-incompat or release-notes,
   you MUST add text here that you would like to see appear in release notes of the
   next release. -->
   
   <!-- If you have a series of commits adding or enabling a feature, then
   add this section only in final commit that marks the feature completed.
   Refer to earlier release notes to see examples of text.
   -->
   ## Documentation
   <!-- If you have introduced a new feature or configuration, please add it to the documentation as well.
   See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733919705



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +158,45 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {
+              ExpressionContext timeCol = arguments.get(1);
+              String dataType = arguments.get(2).getIdentifier();

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733933204



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +158,45 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {
+              ExpressionContext timeCol = arguments.get(1);
+              String dataType = arguments.get(2).getIdentifier();
+              DataSchema.ColumnDataType columnDataType = DataSchema.ColumnDataType.valueOf(dataType.toUpperCase());
+              switch (columnDataType) {
+                case BOOLEAN:
+                case INT:
+                  return new LastIntValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.INT_VAL_TIME_PAIR_SER_DE,

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r731100964



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LastWithTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LastWithTimeAggregationFunction extends BaseSingleInputAggregationFunction<LastWithTimePair, Double> {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r730144239



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LastWithTimePair.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
+public class LastWithTimePair implements Comparable<LastWithTimePair> {
+  private double _data;
+  private long _time;
+
+  public LastWithTimePair(double data, long time) {
+    _data = data;
+    _time = time;
+  }
+
+  public void apply(double data, long time) {

Review comment:
       Handle the merge logic within the function, and we can make the Pair itself immutable

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LastWithTimePair.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
+public class LastWithTimePair implements Comparable<LastWithTimePair> {
+  private double _data;
+  private long _time;
+
+  public LastWithTimePair(double data, long time) {
+    _data = data;
+    _time = time;
+  }
+
+  public void apply(double data, long time) {
+    if (time >= _time) {
+      _data = data;
+      _time = time;
+    }
+  }
+
+  public void apply(@Nonnull LastWithTimePair lastWithTimePair) {

Review comment:
       (Convention) we don't usually put nonnull annotation, but only the nullable because they look quite similar, and assume everything not annotated as nonnull. 

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LastWithTimePair.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
+public class LastWithTimePair implements Comparable<LastWithTimePair> {

Review comment:
       Let's make it `ValueTimePair` where value can be of different type (declare value as `Comparable`)? The pari doesn't need to be `Comparable`, and we can handle the merge logic within the function. The `FIRST` function can reuse the same pair.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LastWithTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LastWithTimeAggregationFunction extends BaseSingleInputAggregationFunction<LastWithTimePair, Double> {
+  private final ExpressionContext _timeCol;
+
+  public LastWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) {
+    super(dataCol);
+    _timeCol = timeCol;
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.LASTWITHTIME;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    double lastData = Double.POSITIVE_INFINITY;
+    long lastTime = Long.MIN_VALUE;
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = blockTimeSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        double data = doubleValues[i];
+        long time = timeValues[i];
+        if (time >= lastTime) {
+          lastTime = time;
+          lastData = data;
+        }
+      }
+    } else {
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        LastWithTimePair lastWithTimePair = ObjectSerDeUtils.LAST_WITH_TIME_PAIR_SER_DE.deserialize(bytesValues[i]);
+        double data = lastWithTimePair.getData();
+        long time = lastWithTimePair.getTime();
+        if (time >= lastTime) {
+          lastTime = time;
+          lastData = data;
+        }
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, double data, long time) {
+    LastWithTimePair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null) {
+      aggregationResultHolder.setValue(new LastWithTimePair(data, time));
+    } else {
+      lastWithTimePair.apply(data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = timeValSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        double data = doubleValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    } else {
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        LastWithTimePair lastWithTimePair = ObjectSerDeUtils.LAST_WITH_TIME_PAIR_SER_DE.deserialize(bytesValues[i]);
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, lastWithTimePair.getData(), lastWithTimePair.getTime());
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = timeValSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        double value = doubleValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
+      }
+    } else {
+      // Serialized LastWithTimePair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        LastWithTimePair lastWithTimePair = ObjectSerDeUtils.LAST_WITH_TIME_PAIR_SER_DE.deserialize(bytesValues[i]);
+        double data = lastWithTimePair.getData();
+        long time = lastWithTimePair.getTime();
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, data, time);
+        }
+      }
+    }
+  }
+
+  protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, double data, long time) {
+    LastWithTimePair lastWithTimePair = groupByResultHolder.getResult(groupKey);
+    if (lastWithTimePair == null) {
+      groupByResultHolder.setValueForKey(groupKey, new LastWithTimePair(data, time));
+    } else {
+      lastWithTimePair.apply(data, time);
+    }
+  }
+
+  @Override
+  public LastWithTimePair extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    LastWithTimePair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null) {
+      return new LastWithTimePair(Double.POSITIVE_INFINITY, Long.MIN_VALUE);

Review comment:
       Put a constant for the default, and I feel `Double.NaN` might be a better default?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
##########
@@ -42,7 +42,6 @@
 
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<Set, Integer> {
-

Review comment:
       (nit) Seems unrelated?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-949031434


   @Jackie-Jiang @lakshmanan-v I have already addressed the comments. Please take another look and leave any comment you have! Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r736057387



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +157,46 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() == 3) {
+              ExpressionContext timeCol = arguments.get(1);
+              ExpressionContext dataType = arguments.get(2);
+              if (dataType.getType() != ExpressionContext.Type.LITERAL) {
+                throw new IllegalArgumentException("Third argument of lastWithTime Function should be literal."
+                    + " The function can be used as lastWithTime(dataColumn, timeColumn, 'dataType')");
+              }
+              FieldSpec.DataType fieldDataType
+                  = FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase());
+              switch (fieldDataType) {
+                case BOOLEAN:
+                case INT:
+                  return new LastIntValueWithTimeAggregationFunction(
+                      firstArgument,

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r736058213



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR
+      = new DoubleLongPair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Double> constructValueLongPair(Double value, long time) {
+    return new DoubleLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
+    Double lastData = defaultValueLongPair.getValue();
+    long lastTime = defaultValueLongPair.getTime();
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,

Review comment:
       I have already formatted using the pinot coding format style




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r736066894



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR
+      = new DoubleLongPair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Double> constructValueLongPair(Double value, long time) {
+    return new DoubleLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
+    Double lastData = defaultValueLongPair.getValue();
+    long lastTime = defaultValueLongPair.getTime();
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,

Review comment:
       I prefer to keep as it is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r736029051



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -330,6 +351,99 @@ public MinMaxRangePair deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<? extends ValueLongPair<Integer>> INT_LONG_PAIR_SER_DE
+      = new ObjectSerDe<IntLongPair>() {

Review comment:
       (minor) Same for other serDe
   ```suggestion
     public static final ObjectSerDe<IntLongPair> INT_LONG_PAIR_SER_DE = new ObjectSerDe<IntLongPair>() {
   ```

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR
+      = new DoubleLongPair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Double> constructValueLongPair(Double value, long time) {
+    return new DoubleLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
+    Double lastData = defaultValueLongPair.getValue();
+    long lastTime = defaultValueLongPair.getTime();
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,

Review comment:
       (minor, code format) Suggest reformatting them, same for other places

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * This function is used for LastWithTime calculations.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'dataType')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ *   <li>dataType: the data type of data column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>>
+    extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+  protected final ExpressionContext _timeCol;
+  private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> _objectSerDe;
+
+  public LastWithTimeAggregationFunction(ExpressionContext dataCol,
+      ExpressionContext timeCol,
+      ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
+    super(dataCol);
+    _timeCol = timeCol;
+    _objectSerDe = objectSerDe;
+  }
+
+  public abstract ValueLongPair<V> constructValueLongPair(V value, long time);
+
+  public abstract ValueLongPair<V> getDefaultValueTimePair();
+
+  public abstract void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet);
+
+  public abstract void aggregateGroupResultWithRawDataSv(int length,
+      int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet);
+
+  public abstract void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet);
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.LASTWITHTIME;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, blockTimeSet);
+    } else {
+      ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
+      V lastData = defaultValueLongPair.getValue();
+      long lastTime = defaultValueLongPair.getTime();
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+        V data = lastWithTimePair.getValue();
+        long time = lastWithTimePair.getTime();
+        if (time >= lastTime) {
+          lastTime = time;
+          lastData = data;
+        }
+      }
+      setAggregationResult(aggregationResultHolder, lastData, lastTime);
+    }
+  }
+
+  protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, V data, long time) {
+    ValueLongPair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) {
+      aggregationResultHolder.setValue(constructValueLongPair(data, time));
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateGroupResultWithRawDataSv(length, groupKeyArray, groupByResultHolder,
+          blockValSet, timeValSet);
+    } else {
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+        setGroupByResult(groupKeyArray[i],
+            groupByResultHolder,
+            lastWithTimePair.getValue(),
+            lastWithTimePair.getTime());
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateGroupResultWithRawDataMv(length, groupKeysArray, groupByResultHolder, blockValSet, timeValSet);
+    } else {
+      // Serialized ValueTimePair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+        V data = lastWithTimePair.getValue();
+        long time = lastWithTimePair.getTime();
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, data, time);
+        }
+      }
+    }
+  }
+
+  protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, V data, long time) {
+    ValueLongPair lastWithTimePair = groupByResultHolder.getResult(groupKey);
+    if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) {
+      groupByResultHolder.setValueForKey(groupKey, constructValueLongPair(data, time));
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    ValueLongPair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null) {
+      return getDefaultValueTimePair();
+    } else {
+      return lastWithTimePair;
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    ValueLongPair<V> lastWithTimePair = groupByResultHolder.getResult(groupKey);
+    if (lastWithTimePair == null) {
+      return getDefaultValueTimePair();
+    } else {
+      return lastWithTimePair;
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> merge(ValueLongPair<V> intermediateResult1, ValueLongPair<V> intermediateResult2) {
+    if (intermediateResult1.getTime() >= intermediateResult2.getTime()) {
+      return intermediateResult1;
+    } else {
+      return intermediateResult2;
+    }
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol);
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return true;

Review comment:
       This should be `false`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -109,7 +115,12 @@ private ObjectSerDeUtils() {
     Int2LongMap(23),
     Long2LongMap(24),
     Float2LongMap(25),
-    Double2LongMap(26);
+    Double2LongMap(26),
+    IntValueTimePair(27),

Review comment:
       Let's also change the enum name to match the Pair (`IntLongPair`, `LongLongPair`, etc.)

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +157,46 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() == 3) {
+              ExpressionContext timeCol = arguments.get(1);
+              ExpressionContext dataType = arguments.get(2);
+              if (dataType.getType() != ExpressionContext.Type.LITERAL) {
+                throw new IllegalArgumentException("Third argument of lastWithTime Function should be literal."
+                    + " The function can be used as lastWithTime(dataColumn, timeColumn, 'dataType')");
+              }
+              FieldSpec.DataType fieldDataType
+                  = FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase());
+              switch (fieldDataType) {
+                case BOOLEAN:
+                case INT:
+                  return new LastIntValueWithTimeAggregationFunction(
+                      firstArgument,

Review comment:
       (minor, code format) We usually put arguments in the same line. Same for other places




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-948138425


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7584](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6d60f1e) into [master](https://codecov.io/gh/apache/pinot/commit/5eee297dbea7b3b73ae853f2479a89e9ce017363?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5eee297) will **increase** coverage by `36.54%`.
   > The diff coverage is `85.97%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7584/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #7584       +/-   ##
   =============================================
   + Coverage     32.16%   68.71%   +36.54%     
   - Complexity        0     3918     +3918     
   =============================================
     Files          1513     1180      -333     
     Lines         75557    57623    -17934     
     Branches      11055     8833     -2222     
   =============================================
   + Hits          24305    39594    +15289     
   + Misses        49156    15224    -33932     
   - Partials       2096     2805      +709     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `68.71% <85.97%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...tion/function/LastWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0V2l0aFRpbWVBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `47.14% <47.14%> (ø)` | |
   | [...gregation/function/AggregationFunctionFactory.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9BZ2dyZWdhdGlvbkZ1bmN0aW9uRmFjdG9yeS5qYXZh) | `83.33% <60.00%> (+67.17%)` | :arrow_up: |
   | [...inot/segment/local/customobject/ValueLongPair.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9jdXN0b21vYmplY3QvVmFsdWVMb25nUGFpci5qYXZh) | `63.63% <63.63%> (ø)` | |
   | [...ion/LastFloatValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0RmxvYXRWYWx1ZVdpdGhUaW1lQWdncmVnYXRpb25GdW5jdGlvbi5qYXZh) | `97.22% <97.22%> (ø)` | |
   | [...on/LastStringValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0U3RyaW5nVmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `97.22% <97.22%> (ø)` | |
   | [...on/LastDoubleValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0RG91YmxlVmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `97.29% <97.29%> (ø)` | |
   | [...tion/LastLongValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0TG9uZ1ZhbHVlV2l0aFRpbWVBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `97.29% <97.29%> (ø)` | |
   | [...org/apache/pinot/core/common/ObjectSerDeUtils.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vT2JqZWN0U2VyRGVVdGlscy5qYXZh) | `90.59% <97.56%> (+43.15%)` | :arrow_up: |
   | [...ction/LastIntValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0SW50VmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `97.72% <97.72%> (ø)` | |
   | [...xt/utils/BrokerRequestToQueryContextConverter.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9yZXF1ZXN0L2NvbnRleHQvdXRpbHMvQnJva2VyUmVxdWVzdFRvUXVlcnlDb250ZXh0Q29udmVydGVyLmphdmE=) | `98.21% <100.00%> (+0.08%)` | :arrow_up: |
   | ... and [1372 more](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [5eee297...6d60f1e](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] pjpringle edited a comment on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
pjpringle edited a comment on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-951733301


   thanks! looking forward to giving this a go. 
   
   can we now add functionality to the segment compaction to use this operator, e.g. take last value every hour


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733147260



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LastWithTimePair.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
+public class LastWithTimePair implements Comparable<LastWithTimePair> {
+  private double _data;
+  private long _time;
+
+  public LastWithTimePair(double data, long time) {
+    _data = data;
+    _time = time;
+  }
+
+  public void apply(double data, long time) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733938036



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleValueTimePair;
+import org.apache.pinot.segment.local.customobject.ValueTimePair;
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueTimePair<Double> DEFAULT_VALUE_TIME_PAIR
+          = new DoubleValueTimePair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+          ExpressionContext dataCol,
+          ExpressionContext timeCol,
+          ObjectSerDeUtils.ObjectSerDe<? extends ValueTimePair<Double>> objectSerDe) {
+    super(dataCol, timeCol, objectSerDe);
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol, ExpressionContext.forLiteral("Long"));

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-948138425


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7584](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e4e7791) into [master](https://codecov.io/gh/apache/pinot/commit/5eee297dbea7b3b73ae853f2479a89e9ce017363?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5eee297) will **decrease** coverage by `1.22%`.
   > The diff coverage is `4.30%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7584/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7584      +/-   ##
   ==========================================
   - Coverage   32.16%   30.94%   -1.23%     
   ==========================================
     Files        1513     1563      +50     
     Lines       75557    79062    +3505     
     Branches    11055    11712     +657     
   ==========================================
   + Hits        24305    24466     +161     
   - Misses      49156    52513    +3357     
   + Partials     2096     2083      -13     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `29.20% <4.30%> (-1.33%)` | :arrow_down: |
   | integration2 | `27.68% <4.30%> (-1.26%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...gregation/function/AggregationFunctionFactory.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9BZ2dyZWdhdGlvbkZ1bmN0aW9uRmFjdG9yeS5qYXZh) | `14.41% <0.00%> (-1.75%)` | :arrow_down: |
   | [...on/LastDoubleValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0RG91YmxlVmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ion/LastFloatValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0RmxvYXRWYWx1ZVdpdGhUaW1lQWdncmVnYXRpb25GdW5jdGlvbi5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ction/LastIntValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0SW50VmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...tion/LastLongValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0TG9uZ1ZhbHVlV2l0aFRpbWVBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...on/LastStringValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0U3RyaW5nVmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...tion/function/LastWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0V2l0aFRpbWVBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...xt/utils/BrokerRequestToQueryContextConverter.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9yZXF1ZXN0L2NvbnRleHQvdXRpbHMvQnJva2VyUmVxdWVzdFRvUXVlcnlDb250ZXh0Q29udmVydGVyLmphdmE=) | `93.75% <0.00%> (-4.39%)` | :arrow_down: |
   | [...egment/local/customobject/DoubleValueTimePair.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9jdXN0b21vYmplY3QvRG91YmxlVmFsdWVUaW1lUGFpci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...segment/local/customobject/FloatValueTimePair.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9jdXN0b21vYmplY3QvRmxvYXRWYWx1ZVRpbWVQYWlyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | ... and [123 more](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [5eee297...e4e7791](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733972147



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleValueTimePair;
+import org.apache.pinot.segment.local.customobject.ValueTimePair;
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueTimePair<Double> DEFAULT_VALUE_TIME_PAIR
+          = new DoubleValueTimePair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+          ExpressionContext dataCol,
+          ExpressionContext timeCol,
+          ObjectSerDeUtils.ObjectSerDe<? extends ValueTimePair<Double>> objectSerDe) {
+    super(dataCol, timeCol, objectSerDe);
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol, ExpressionContext.forLiteral("Long"));
+  }
+
+  @Override
+  public ValueTimePair<Double> constructValueTimePair(Double value, long time) {
+    return new DoubleValueTimePair(value, time);
+  }
+
+  @Override
+  public ValueTimePair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void updateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+                                             BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueTimePair<Double> defaultValueTimePair = getDefaultValueTimePair();
+    Double lastData = defaultValueTimePair.getValue();
+    long lastTime = defaultValueTimePair.getTime();
+    double [] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void updateGroupResultWithRawDataSv(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+                                             BlockValSet blockValSet, BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void updateGroupResultWithRawDataMv(int length,
+                                             int[][] groupKeysArray,
+                                             GroupByResultHolder groupByResultHolder,
+                                             BlockValSet blockValSet,
+                                             BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double value = doubleValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733147451



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LastWithTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LastWithTimeAggregationFunction extends BaseSingleInputAggregationFunction<LastWithTimePair, Double> {
+  private final ExpressionContext _timeCol;
+
+  public LastWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) {
+    super(dataCol);
+    _timeCol = timeCol;
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.LASTWITHTIME;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    double lastData = Double.POSITIVE_INFINITY;
+    long lastTime = Long.MIN_VALUE;
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = blockTimeSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        double data = doubleValues[i];
+        long time = timeValues[i];
+        if (time >= lastTime) {
+          lastTime = time;
+          lastData = data;
+        }
+      }
+    } else {
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        LastWithTimePair lastWithTimePair = ObjectSerDeUtils.LAST_WITH_TIME_PAIR_SER_DE.deserialize(bytesValues[i]);
+        double data = lastWithTimePair.getData();
+        long time = lastWithTimePair.getTime();
+        if (time >= lastTime) {
+          lastTime = time;
+          lastData = data;
+        }
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, double data, long time) {
+    LastWithTimePair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null) {
+      aggregationResultHolder.setValue(new LastWithTimePair(data, time));
+    } else {
+      lastWithTimePair.apply(data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = timeValSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        double data = doubleValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    } else {
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        LastWithTimePair lastWithTimePair = ObjectSerDeUtils.LAST_WITH_TIME_PAIR_SER_DE.deserialize(bytesValues[i]);
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, lastWithTimePair.getData(), lastWithTimePair.getTime());
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = timeValSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        double value = doubleValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
+      }
+    } else {
+      // Serialized LastWithTimePair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        LastWithTimePair lastWithTimePair = ObjectSerDeUtils.LAST_WITH_TIME_PAIR_SER_DE.deserialize(bytesValues[i]);
+        double data = lastWithTimePair.getData();
+        long time = lastWithTimePair.getTime();
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, data, time);
+        }
+      }
+    }
+  }
+
+  protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, double data, long time) {
+    LastWithTimePair lastWithTimePair = groupByResultHolder.getResult(groupKey);
+    if (lastWithTimePair == null) {
+      groupByResultHolder.setValueForKey(groupKey, new LastWithTimePair(data, time));
+    } else {
+      lastWithTimePair.apply(data, time);
+    }
+  }
+
+  @Override
+  public LastWithTimePair extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    LastWithTimePair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null) {
+      return new LastWithTimePair(Double.POSITIVE_INFINITY, Long.MIN_VALUE);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r736056548



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -330,6 +351,99 @@ public MinMaxRangePair deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<? extends ValueLongPair<Integer>> INT_LONG_PAIR_SER_DE
+      = new ObjectSerDe<IntLongPair>() {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-948138425


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7584](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e8df187) into [master](https://codecov.io/gh/apache/pinot/commit/5eee297dbea7b3b73ae853f2479a89e9ce017363?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5eee297) will **increase** coverage by `38.27%`.
   > The diff coverage is `85.97%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7584/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #7584       +/-   ##
   =============================================
   + Coverage     32.16%   70.44%   +38.27%     
   - Complexity        0     3908     +3908     
   =============================================
     Files          1513     1571       +58     
     Lines         75557    79402     +3845     
     Branches      11055    11751      +696     
   =============================================
   + Hits          24305    55932    +31627     
   + Misses        49156    19656    -29500     
   - Partials       2096     3814     +1718     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `27.76% <4.23%> (-1.18%)` | :arrow_down: |
   | unittests1 | `68.67% <85.97%> (?)` | |
   | unittests2 | `14.65% <0.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...tion/function/LastWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0V2l0aFRpbWVBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `47.14% <47.14%> (ø)` | |
   | [...gregation/function/AggregationFunctionFactory.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9BZ2dyZWdhdGlvbkZ1bmN0aW9uRmFjdG9yeS5qYXZh) | `83.33% <60.00%> (+67.17%)` | :arrow_up: |
   | [...inot/segment/local/customobject/ValueLongPair.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9jdXN0b21vYmplY3QvVmFsdWVMb25nUGFpci5qYXZh) | `63.63% <63.63%> (ø)` | |
   | [...ion/LastFloatValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0RmxvYXRWYWx1ZVdpdGhUaW1lQWdncmVnYXRpb25GdW5jdGlvbi5qYXZh) | `97.22% <97.22%> (ø)` | |
   | [...on/LastStringValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0U3RyaW5nVmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `97.22% <97.22%> (ø)` | |
   | [...on/LastDoubleValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0RG91YmxlVmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `97.29% <97.29%> (ø)` | |
   | [...tion/LastLongValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0TG9uZ1ZhbHVlV2l0aFRpbWVBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `97.29% <97.29%> (ø)` | |
   | [...org/apache/pinot/core/common/ObjectSerDeUtils.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vT2JqZWN0U2VyRGVVdGlscy5qYXZh) | `90.59% <97.56%> (+43.15%)` | :arrow_up: |
   | [...ction/LastIntValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0SW50VmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `97.72% <97.72%> (ø)` | |
   | [...xt/utils/BrokerRequestToQueryContextConverter.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9yZXF1ZXN0L2NvbnRleHQvdXRpbHMvQnJva2VyUmVxdWVzdFRvUXVlcnlDb250ZXh0Q29udmVydGVyLmphdmE=) | `98.21% <100.00%> (+0.08%)` | :arrow_up: |
   | ... and [1125 more](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [5eee297...e8df187](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] lakshmanan-v commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
lakshmanan-v commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r730114099



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +156,13 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {
+              ExpressionContext secondArgument = arguments.get(1);
+              return new LastWithTimeAggregationFunction(firstArgument, secondArgument);
+            } else {
+              throw new IllegalArgumentException();

Review comment:
       Could you add a message to the exception ?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LastWithTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LastWithTimeAggregationFunction extends BaseSingleInputAggregationFunction<LastWithTimePair, Double> {

Review comment:
       nit: javadoc for this function ?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LastWithTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LastWithTimeAggregationFunction extends BaseSingleInputAggregationFunction<LastWithTimePair, Double> {

Review comment:
       Can we make this work for all the data types (like boolean, varchar) etc ? Compare the timestamp and return the last value irrespective of the type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] lakshmanan-v commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
lakshmanan-v commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r734791697



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +157,46 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() == 3) {
+              ExpressionContext timeCol = arguments.get(1);
+              ExpressionContext dataType = arguments.get(2);
+              if (dataType.getType() != ExpressionContext.Type.LITERAL) {
+                throw new IllegalArgumentException("Third argument of lastWithTime Function should be literal."
+                    + " The function can be used as lastWithTime(dataColumn, timeColumn, 'dataType')");
+              }
+              FieldSpec.DataType fieldDataType
+                  = FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase());
+              switch (fieldDataType) {
+                case BOOLEAN:

Review comment:
       can datetime be added here as well ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] pjpringle commented on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
pjpringle commented on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-951733301


   thanks! looking forward to giving this a go. 
   
   can we now ads functionality to the segment compaction to use this operator, e.g. take last value every hour


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r731109059



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LastWithTimePair.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
+public class LastWithTimePair implements Comparable<LastWithTimePair> {
+  private double _data;
+  private long _time;
+
+  public LastWithTimePair(double data, long time) {
+    _data = data;
+    _time = time;
+  }
+
+  public void apply(double data, long time) {
+    if (time >= _time) {
+      _data = data;
+      _time = time;
+    }
+  }
+
+  public void apply(@Nonnull LastWithTimePair lastWithTimePair) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733943735



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ValueTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * This function is used for LastWithTime calculations.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'dataType')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ *   <li>dataType: the data type of data column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>>
+        extends BaseSingleInputAggregationFunction<ValueTimePair<V>, V> {
+  protected final ExpressionContext _timeCol;
+  private final ObjectSerDeUtils.ObjectSerDe<? extends ValueTimePair<V>> _objectSerDe;
+
+  public LastWithTimeAggregationFunction(ExpressionContext dataCol,
+                                         ExpressionContext timeCol,
+                                         ObjectSerDeUtils.ObjectSerDe<? extends ValueTimePair<V>> objectSerDe) {
+    super(dataCol);
+    _timeCol = timeCol;
+    _objectSerDe = objectSerDe;
+  }
+
+  public abstract ValueTimePair<V> constructValueTimePair(V value, long time);
+
+  public abstract ValueTimePair<V> getDefaultValueTimePair();
+
+  public abstract void updateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733935358



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +158,45 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {
+              ExpressionContext timeCol = arguments.get(1);
+              String dataType = arguments.get(2).getIdentifier();
+              DataSchema.ColumnDataType columnDataType = DataSchema.ColumnDataType.valueOf(dataType.toUpperCase());

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733913858



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +158,45 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {
+              ExpressionContext timeCol = arguments.get(1);
+              String dataType = arguments.get(2).getIdentifier();
+              DataSchema.ColumnDataType columnDataType = DataSchema.ColumnDataType.valueOf(dataType.toUpperCase());
+              switch (columnDataType) {
+                case BOOLEAN:
+                case INT:
+                  return new LastIntValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.INT_VAL_TIME_PAIR_SER_DE,
+                          columnDataType == DataSchema.ColumnDataType.BOOLEAN);
+                case LONG:
+                  return new LastLongValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.LONG_VAL_TIME_PAIR_SER_DE);
+                case FLOAT:
+                  return new LastFloatValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.FLOAT_VAL_TIME_PAIR_SER_DE);
+                case DOUBLE:
+                  return new LastDoubleValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.DOUBLE_VAL_TIME_PAIR_SER_DE);
+                case STRING:
+                  return new LastStringValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.STRING_VAL_TIME_PAIR_SER_DE);
+                default:
+                  throw new IllegalArgumentException("Unsupported Value Type for LastWithTime Function:" + dataType);
+              }
+            } else {
+              throw new IllegalArgumentException("Two arguments are required for LastWithTime Function.");

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733929439



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -109,7 +115,12 @@ private ObjectSerDeUtils() {
     Int2LongMap(23),
     Long2LongMap(24),
     Float2LongMap(25),
-    Double2LongMap(26);
+    Double2LongMap(26),
+    IntValueTimePair(27),

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r736065262



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * This function is used for LastWithTime calculations.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'dataType')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ *   <li>dataType: the data type of data column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>>
+    extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+  protected final ExpressionContext _timeCol;
+  private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> _objectSerDe;
+
+  public LastWithTimeAggregationFunction(ExpressionContext dataCol,
+      ExpressionContext timeCol,
+      ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
+    super(dataCol);
+    _timeCol = timeCol;
+    _objectSerDe = objectSerDe;
+  }
+
+  public abstract ValueLongPair<V> constructValueLongPair(V value, long time);
+
+  public abstract ValueLongPair<V> getDefaultValueTimePair();
+
+  public abstract void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet);
+
+  public abstract void aggregateGroupResultWithRawDataSv(int length,
+      int[] groupKeyArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet);
+
+  public abstract void aggregateGroupResultWithRawDataMv(int length,
+      int[][] groupKeysArray,
+      GroupByResultHolder groupByResultHolder,
+      BlockValSet blockValSet,
+      BlockValSet timeValSet);
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.LASTWITHTIME;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateResultWithRawData(length, aggregationResultHolder, blockValSet, blockTimeSet);
+    } else {
+      ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
+      V lastData = defaultValueLongPair.getValue();
+      long lastTime = defaultValueLongPair.getTime();
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+        V data = lastWithTimePair.getValue();
+        long time = lastWithTimePair.getTime();
+        if (time >= lastTime) {
+          lastTime = time;
+          lastData = data;
+        }
+      }
+      setAggregationResult(aggregationResultHolder, lastData, lastTime);
+    }
+  }
+
+  protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, V data, long time) {
+    ValueLongPair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) {
+      aggregationResultHolder.setValue(constructValueLongPair(data, time));
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateGroupResultWithRawDataSv(length, groupKeyArray, groupByResultHolder,
+          blockValSet, timeValSet);
+    } else {
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+        setGroupByResult(groupKeyArray[i],
+            groupByResultHolder,
+            lastWithTimePair.getValue(),
+            lastWithTimePair.getTime());
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      aggregateGroupResultWithRawDataMv(length, groupKeysArray, groupByResultHolder, blockValSet, timeValSet);
+    } else {
+      // Serialized ValueTimePair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        ValueLongPair<V> lastWithTimePair = _objectSerDe.deserialize(bytesValues[i]);
+        V data = lastWithTimePair.getValue();
+        long time = lastWithTimePair.getTime();
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, data, time);
+        }
+      }
+    }
+  }
+
+  protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, V data, long time) {
+    ValueLongPair lastWithTimePair = groupByResultHolder.getResult(groupKey);
+    if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) {
+      groupByResultHolder.setValueForKey(groupKey, constructValueLongPair(data, time));
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    ValueLongPair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null) {
+      return getDefaultValueTimePair();
+    } else {
+      return lastWithTimePair;
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    ValueLongPair<V> lastWithTimePair = groupByResultHolder.getResult(groupKey);
+    if (lastWithTimePair == null) {
+      return getDefaultValueTimePair();
+    } else {
+      return lastWithTimePair;
+    }
+  }
+
+  @Override
+  public ValueLongPair<V> merge(ValueLongPair<V> intermediateResult1, ValueLongPair<V> intermediateResult2) {
+    if (intermediateResult1.getTime() >= intermediateResult2.getTime()) {
+      return intermediateResult1;
+    } else {
+      return intermediateResult2;
+    }
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol);
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return true;

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r736061696



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR
+      = new DoubleLongPair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Double> constructValueLongPair(Double value, long time) {
+    return new DoubleLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
+    Double lastData = defaultValueLongPair.getValue();
+    long lastTime = defaultValueLongPair.getTime();
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,

Review comment:
       The current code style won't reformat the explicit line breaks. 2 ways to reformat:
   1. Uncheck the `Code Style` -> `Java` -> `Wrapping and Braces` -> `Keep when reformatting` -> `Line breaks` in the IDE preferences (this is my setting)
   2. Manually remove the line breaks and then reformat
   
   This is optional, and can be ignored if you feel it is too much effort




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r731106454



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
##########
@@ -42,7 +42,6 @@
 
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<Set, Integer> {
-

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r731106674



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
##########
@@ -42,7 +42,6 @@
 
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<Set, Integer> {
-

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733979481



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -330,6 +351,101 @@ public MinMaxRangePair deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<? extends ValueTimePair<Integer>> INT_VAL_TIME_PAIR_SER_DE
+          = new ObjectSerDe<IntValueTimePair>() {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733146796



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LastWithTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LastWithTimeAggregationFunction extends BaseSingleInputAggregationFunction<LastWithTimePair, Double> {

Review comment:
       I make it work for boolean, int, long, float, double and string. We need support other column type if needed in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-948138425


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7584](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (0c37328) into [master](https://codecov.io/gh/apache/pinot/commit/5eee297dbea7b3b73ae853f2479a89e9ce017363?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5eee297) will **decrease** coverage by `4.56%`.
   > The diff coverage is `14.17%`.
   
   > :exclamation: Current head 0c37328 differs from pull request most recent head 6d60f1e. Consider uploading reports for the commit 6d60f1e to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7584/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7584      +/-   ##
   ==========================================
   - Coverage   32.16%   27.60%   -4.57%     
   ==========================================
     Files        1513     1566      +53     
     Lines       75557    79579    +4022     
     Branches    11055    11795     +740     
   ==========================================
   - Hits        24305    21965    -2340     
   - Misses      49156    55619    +6463     
   + Partials     2096     1995     -101     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `27.60% <14.17%> (-1.35%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/common/minion/MergeRollupTaskMetadata.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01lcmdlUm9sbHVwVGFza01ldGFkYXRhLmphdmE=) | `0.00% <ø> (-78.27%)` | :arrow_down: |
   | [.../minion/RealtimeToOfflineSegmentsTaskMetadata.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL1JlYWx0aW1lVG9PZmZsaW5lU2VnbWVudHNUYXNrTWV0YWRhdGEuamF2YQ==) | `100.00% <ø> (+26.66%)` | :arrow_up: |
   | [...ot/common/request/context/RequestContextUtils.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vcmVxdWVzdC9jb250ZXh0L1JlcXVlc3RDb250ZXh0VXRpbHMuamF2YQ==) | `53.26% <0.00%> (-22.26%)` | :arrow_down: |
   | [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `53.24% <0.00%> (-6.07%)` | :arrow_down: |
   | [...java/org/apache/pinot/common/utils/StringUtil.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvU3RyaW5nVXRpbC5qYXZh) | `80.00% <ø> (+15.29%)` | :arrow_up: |
   | [...apache/pinot/common/utils/config/TagNameUtils.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvY29uZmlnL1RhZ05hbWVVdGlscy5qYXZh) | `64.28% <0.00%> (-24.61%)` | :arrow_down: |
   | [...g/apache/pinot/common/utils/helix/HelixHelper.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvaGVsaXgvSGVsaXhIZWxwZXIuamF2YQ==) | `41.26% <0.00%> (-6.53%)` | :arrow_down: |
   | [...ller/api/resources/PinotTenantRestletResource.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Bpbm90VGVuYW50UmVzdGxldFJlc291cmNlLmphdmE=) | `4.21% <0.00%> (-0.39%)` | :arrow_down: |
   | [...ler/api/resources/TableConfigsRestletResource.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1RhYmxlQ29uZmlnc1Jlc3RsZXRSZXNvdXJjZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...realtime/segment/DefaultFlushThresholdUpdater.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYWx0aW1lL3NlZ21lbnQvRGVmYXVsdEZsdXNoVGhyZXNob2xkVXBkYXRlci5qYXZh) | `83.33% <ø> (ø)` | |
   | ... and [376 more](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [5eee297...6d60f1e](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-952043864


   @pjpringle any sample change? I can add it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r731120375



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LastWithTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LastWithTimeAggregationFunction extends BaseSingleInputAggregationFunction<LastWithTimePair, Double> {
+  private final ExpressionContext _timeCol;
+
+  public LastWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) {
+    super(dataCol);
+    _timeCol = timeCol;
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.LASTWITHTIME;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    double lastData = Double.POSITIVE_INFINITY;
+    long lastTime = Long.MIN_VALUE;
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = blockTimeSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {

Review comment:
       They are not sorted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733947866



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueTimePair.java
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+public abstract class ValueTimePair<V extends Comparable<V>> implements Comparable<ValueTimePair<V>> {
+  protected V _value;
+  protected long _time;
+
+  public ValueTimePair(V value, long time) {
+    _value = value;
+    _time = time;
+  }
+
+  public V getValue() {
+    return _value;
+  }
+
+  public long getTime() {
+    return _time;
+  }
+
+  abstract public byte[] toBytes();
+
+  @Override
+  public int compareTo(ValueTimePair<V> o) {

Review comment:
       The compareTo is not used by aggregation function. The method here is used to make the class Serializable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733222357



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -109,7 +115,12 @@ private ObjectSerDeUtils() {
     Int2LongMap(23),
     Long2LongMap(24),
     Float2LongMap(25),
-    Double2LongMap(26);
+    Double2LongMap(26),
+    IntValueTimePair(27),

Review comment:
       Let's name them `IntLongPair`, `LongLongPair` etc so that they are more generic, and can be reused in the future

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueTimePair.java
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+public abstract class ValueTimePair<V extends Comparable<V>> implements Comparable<ValueTimePair<V>> {
+  protected V _value;
+  protected long _time;
+
+  public ValueTimePair(V value, long time) {
+    _value = value;
+    _time = time;
+  }
+
+  public V getValue() {
+    return _value;
+  }
+
+  public long getTime() {
+    return _time;
+  }
+
+  abstract public byte[] toBytes();
+
+  @Override
+  public int compareTo(ValueTimePair<V> o) {

Review comment:
       The comparison logic should be within the aggregation function, so that we can reuse this pair for `FIRST`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -330,6 +351,101 @@ public MinMaxRangePair deserialize(ByteBuffer byteBuffer) {
     }
   };
 
+  public static final ObjectSerDe<? extends ValueTimePair<Integer>> INT_VAL_TIME_PAIR_SER_DE
+          = new ObjectSerDe<IntValueTimePair>() {

Review comment:
       Please follow the pinot style, and reformat the changes: https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#setup-ide

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +158,45 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {
+              ExpressionContext timeCol = arguments.get(1);
+              String dataType = arguments.get(2).getIdentifier();

Review comment:
       The third argument should be a literal. Throw exception if it is not

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleValueTimePair;
+import org.apache.pinot.segment.local.customobject.ValueTimePair;
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueTimePair<Double> DEFAULT_VALUE_TIME_PAIR
+          = new DoubleValueTimePair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+          ExpressionContext dataCol,
+          ExpressionContext timeCol,
+          ObjectSerDeUtils.ObjectSerDe<? extends ValueTimePair<Double>> objectSerDe) {
+    super(dataCol, timeCol, objectSerDe);
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol, ExpressionContext.forLiteral("Long"));
+  }
+
+  @Override
+  public ValueTimePair<Double> constructValueTimePair(Double value, long time) {
+    return new DoubleValueTimePair(value, time);
+  }
+
+  @Override
+  public ValueTimePair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void updateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+                                             BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueTimePair<Double> defaultValueTimePair = getDefaultValueTimePair();
+    Double lastData = defaultValueTimePair.getValue();
+    long lastTime = defaultValueTimePair.getTime();
+    double [] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void updateGroupResultWithRawDataSv(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+                                             BlockValSet blockValSet, BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void updateGroupResultWithRawDataMv(int length,
+                                             int[][] groupKeysArray,
+                                             GroupByResultHolder groupByResultHolder,
+                                             BlockValSet blockValSet,
+                                             BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double value = doubleValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ", Double)";

Review comment:
       Same for other places
   ```suggestion
       return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ",'DOUBLE')";
   ```

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +158,45 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {
+              ExpressionContext timeCol = arguments.get(1);
+              String dataType = arguments.get(2).getIdentifier();
+              DataSchema.ColumnDataType columnDataType = DataSchema.ColumnDataType.valueOf(dataType.toUpperCase());

Review comment:
       We should use `FieldSpec.DataType`. You can do `DataType.valueOf(dataType.toUpperCase()).getStoredType()`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +158,45 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {

Review comment:
       It should be `== 3`?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +158,45 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {
+              ExpressionContext timeCol = arguments.get(1);
+              String dataType = arguments.get(2).getIdentifier();
+              DataSchema.ColumnDataType columnDataType = DataSchema.ColumnDataType.valueOf(dataType.toUpperCase());
+              switch (columnDataType) {
+                case BOOLEAN:
+                case INT:
+                  return new LastIntValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.INT_VAL_TIME_PAIR_SER_DE,

Review comment:
       (nit) No need to pass in the `ObjectSerDe` as they are fixed for each function.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +158,45 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {
+              ExpressionContext timeCol = arguments.get(1);
+              String dataType = arguments.get(2).getIdentifier();
+              DataSchema.ColumnDataType columnDataType = DataSchema.ColumnDataType.valueOf(dataType.toUpperCase());
+              switch (columnDataType) {
+                case BOOLEAN:
+                case INT:
+                  return new LastIntValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.INT_VAL_TIME_PAIR_SER_DE,
+                          columnDataType == DataSchema.ColumnDataType.BOOLEAN);
+                case LONG:
+                  return new LastLongValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.LONG_VAL_TIME_PAIR_SER_DE);
+                case FLOAT:
+                  return new LastFloatValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.FLOAT_VAL_TIME_PAIR_SER_DE);
+                case DOUBLE:
+                  return new LastDoubleValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.DOUBLE_VAL_TIME_PAIR_SER_DE);
+                case STRING:
+                  return new LastStringValueWithTimeAggregationFunction(
+                          firstArgument,
+                          timeCol,
+                          ObjectSerDeUtils.STRING_VAL_TIME_PAIR_SER_DE);
+                default:
+                  throw new IllegalArgumentException("Unsupported Value Type for LastWithTime Function:" + dataType);
+              }
+            } else {
+              throw new IllegalArgumentException("Two arguments are required for LastWithTime Function.");

Review comment:
       Seems it requires 3 arguments? Can you please also add the expected arguments in the error message?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueTimePair.java
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+public abstract class ValueTimePair<V extends Comparable<V>> implements Comparable<ValueTimePair<V>> {

Review comment:
       Let's rename it to `ValueLongPair` so that its more generic and can be reused in the future

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ValueTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * This function is used for LastWithTime calculations.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'dataType')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ *   <li>dataType: the data type of data column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>>
+        extends BaseSingleInputAggregationFunction<ValueTimePair<V>, V> {
+  protected final ExpressionContext _timeCol;
+  private final ObjectSerDeUtils.ObjectSerDe<? extends ValueTimePair<V>> _objectSerDe;
+
+  public LastWithTimeAggregationFunction(ExpressionContext dataCol,
+                                         ExpressionContext timeCol,
+                                         ObjectSerDeUtils.ObjectSerDe<? extends ValueTimePair<V>> objectSerDe) {
+    super(dataCol);
+    _timeCol = timeCol;
+    _objectSerDe = objectSerDe;
+  }
+
+  public abstract ValueTimePair<V> constructValueTimePair(V value, long time);
+
+  public abstract ValueTimePair<V> getDefaultValueTimePair();
+
+  public abstract void updateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,

Review comment:
       (nit) Suggest renaming it to `aggregate` for readability. Same for group-by

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleValueTimePair;
+import org.apache.pinot.segment.local.customobject.ValueTimePair;
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueTimePair<Double> DEFAULT_VALUE_TIME_PAIR
+          = new DoubleValueTimePair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+          ExpressionContext dataCol,
+          ExpressionContext timeCol,
+          ObjectSerDeUtils.ObjectSerDe<? extends ValueTimePair<Double>> objectSerDe) {
+    super(dataCol, timeCol, objectSerDe);
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol, ExpressionContext.forLiteral("Long"));
+  }
+
+  @Override
+  public ValueTimePair<Double> constructValueTimePair(Double value, long time) {
+    return new DoubleValueTimePair(value, time);
+  }
+
+  @Override
+  public ValueTimePair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void updateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+                                             BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueTimePair<Double> defaultValueTimePair = getDefaultValueTimePair();
+    Double lastData = defaultValueTimePair.getValue();
+    long lastTime = defaultValueTimePair.getTime();
+    double [] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void updateGroupResultWithRawDataSv(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+                                             BlockValSet blockValSet, BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void updateGroupResultWithRawDataMv(int length,
+                                             int[][] groupKeysArray,
+                                             GroupByResultHolder groupByResultHolder,
+                                             BlockValSet blockValSet,
+                                             BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double value = doubleValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {

Review comment:
       Also need to override `getColumnName()`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleValueTimePair;
+import org.apache.pinot.segment.local.customobject.ValueTimePair;
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueTimePair<Double> DEFAULT_VALUE_TIME_PAIR
+          = new DoubleValueTimePair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+          ExpressionContext dataCol,
+          ExpressionContext timeCol,
+          ObjectSerDeUtils.ObjectSerDe<? extends ValueTimePair<Double>> objectSerDe) {
+    super(dataCol, timeCol, objectSerDe);
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol, ExpressionContext.forLiteral("Long"));

Review comment:
       Don't read the literal as input, it has overhead. Just keep `_expression` and `_timeCol`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang merged pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged pull request #7584:
URL: https://github.com/apache/pinot/pull/7584


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r731100282



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +156,13 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {
+              ExpressionContext secondArgument = arguments.get(1);
+              return new LastWithTimeAggregationFunction(firstArgument, secondArgument);
+            } else {
+              throw new IllegalArgumentException();

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-948038722


   I addressed all the comments and finished the testing. Please take another look! @Jackie-Jiang @lakshmanan-v @pjpringle 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733944248



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueTimePair.java
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+public abstract class ValueTimePair<V extends Comparable<V>> implements Comparable<ValueTimePair<V>> {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733947866



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/ValueTimePair.java
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+public abstract class ValueTimePair<V extends Comparable<V>> implements Comparable<ValueTimePair<V>> {
+  protected V _value;
+  protected long _time;
+
+  public ValueTimePair(V value, long time) {
+    _value = value;
+    _time = time;
+  }
+
+  public V getValue() {
+    return _value;
+  }
+
+  public long getTime() {
+    return _time;
+  }
+
+  abstract public byte[] toBytes();
+
+  @Override
+  public int compareTo(ValueTimePair<V> o) {

Review comment:
       The compareTo is not used by aggregation function. The method here is used to make the class Comparable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733939548



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleValueTimePair;
+import org.apache.pinot.segment.local.customobject.ValueTimePair;
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueTimePair<Double> DEFAULT_VALUE_TIME_PAIR
+          = new DoubleValueTimePair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+          ExpressionContext dataCol,
+          ExpressionContext timeCol,
+          ObjectSerDeUtils.ObjectSerDe<? extends ValueTimePair<Double>> objectSerDe) {
+    super(dataCol, timeCol, objectSerDe);
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol, ExpressionContext.forLiteral("Long"));
+  }
+
+  @Override
+  public ValueTimePair<Double> constructValueTimePair(Double value, long time) {
+    return new DoubleValueTimePair(value, time);
+  }
+
+  @Override
+  public ValueTimePair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void updateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+                                             BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueTimePair<Double> defaultValueTimePair = getDefaultValueTimePair();
+    Double lastData = defaultValueTimePair.getValue();
+    long lastTime = defaultValueTimePair.getTime();
+    double [] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void updateGroupResultWithRawDataSv(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+                                             BlockValSet blockValSet, BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+    }
+  }
+
+  @Override
+  public void updateGroupResultWithRawDataMv(int length,
+                                             int[][] groupKeysArray,
+                                             GroupByResultHolder groupByResultHolder,
+                                             BlockValSet blockValSet,
+                                             BlockValSet timeValSet) {
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double value = doubleValues[i];
+      long time = timeValues[i];
+      for (int groupKey : groupKeysArray[i]) {
+        setGroupByResult(groupKey, groupByResultHolder, value, time);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return getType().getName().toLowerCase() + "(" + _expression + "," + _timeCol + ", Double)";

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r734793772



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +157,46 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() == 3) {
+              ExpressionContext timeCol = arguments.get(1);
+              ExpressionContext dataType = arguments.get(2);
+              if (dataType.getType() != ExpressionContext.Type.LITERAL) {
+                throw new IllegalArgumentException("Third argument of lastWithTime Function should be literal."
+                    + " The function can be used as lastWithTime(dataColumn, timeColumn, 'dataType')");
+              }
+              FieldSpec.DataType fieldDataType
+                  = FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase());
+              switch (fieldDataType) {
+                case BOOLEAN:

Review comment:
       Will do as separate change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r736070344



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -109,7 +115,12 @@ private ObjectSerDeUtils() {
     Int2LongMap(23),
     Long2LongMap(24),
     Float2LongMap(25),
-    Double2LongMap(26);
+    Double2LongMap(26),
+    IntValueTimePair(27),

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r736061696



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression, 'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ *   <li>dataExpression: expression that contains the double data column to be calculated last on</li>
+ *   <li>timeExpression: expression that contains the column to be used to decide which data is last, can be any
+ *   Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends LastWithTimeAggregationFunction<Double> {
+
+  private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR
+      = new DoubleLongPair(Double.NaN, Long.MIN_VALUE);
+
+  public LastDoubleValueWithTimeAggregationFunction(
+      ExpressionContext dataCol,
+      ExpressionContext timeCol) {
+    super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+  }
+
+  @Override
+  public ValueLongPair<Double> constructValueLongPair(Double value, long time) {
+    return new DoubleLongPair(value, time);
+  }
+
+  @Override
+  public ValueLongPair<Double> getDefaultValueTimePair() {
+    return DEFAULT_VALUE_TIME_PAIR;
+  }
+
+  @Override
+  public void aggregateResultWithRawData(int length, AggregationResultHolder aggregationResultHolder,
+      BlockValSet blockValSet, BlockValSet timeValSet) {
+    ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
+    Double lastData = defaultValueLongPair.getValue();
+    long lastTime = defaultValueLongPair.getTime();
+    double[] doubleValues = blockValSet.getDoubleValuesSV();
+    long[] timeValues = timeValSet.getLongValuesSV();
+    for (int i = 0; i < length; i++) {
+      double data = doubleValues[i];
+      long time = timeValues[i];
+      if (time >= lastTime) {
+        lastTime = time;
+        lastData = data;
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  @Override
+  public void aggregateGroupResultWithRawDataSv(int length, int[] groupKeyArray,

Review comment:
       The current code style won't reformat the explicit line breaks. 2 ways to reformat:
   1. Uncheck the `Code Style` -> `Java` -> `Wrapping and Braces` -> `Keep when reformatting` -> `Line breaks` in the IDE preferences (this is my setting)
   2. Manually remove the line breaks and then reformat




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-948138425


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7584](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (81a1f58) into [master](https://codecov.io/gh/apache/pinot/commit/5eee297dbea7b3b73ae853f2479a89e9ce017363?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5eee297) will **decrease** coverage by `3.01%`.
   > The diff coverage is `10.53%`.
   
   > :exclamation: Current head 81a1f58 differs from pull request most recent head e8df187. Consider uploading reports for the commit e8df187 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7584/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7584      +/-   ##
   ==========================================
   - Coverage   32.16%   29.14%   -3.02%     
   ==========================================
     Files        1513     1562      +49     
     Lines       75557    79054    +3497     
     Branches    11055    11713     +658     
   ==========================================
   - Hits        24305    23044    -1261     
   - Misses      49156    53979    +4823     
   + Partials     2096     2031      -65     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `29.14% <10.53%> (-1.39%)` | :arrow_down: |
   | integration2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/common/minion/MergeRollupTaskMetadata.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL01lcmdlUm9sbHVwVGFza01ldGFkYXRhLmphdmE=) | `94.73% <ø> (+16.47%)` | :arrow_up: |
   | [.../minion/RealtimeToOfflineSegmentsTaskMetadata.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWluaW9uL1JlYWx0aW1lVG9PZmZsaW5lU2VnbWVudHNUYXNrTWV0YWRhdGEuamF2YQ==) | `100.00% <ø> (+26.66%)` | :arrow_up: |
   | [...e/pinot/common/utils/FileUploadDownloadClient.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvRmlsZVVwbG9hZERvd25sb2FkQ2xpZW50LmphdmE=) | `48.48% <0.00%> (-10.83%)` | :arrow_down: |
   | [...java/org/apache/pinot/common/utils/StringUtil.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvU3RyaW5nVXRpbC5qYXZh) | `60.00% <ø> (-4.71%)` | :arrow_down: |
   | [...apache/pinot/common/utils/config/TagNameUtils.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvY29uZmlnL1RhZ05hbWVVdGlscy5qYXZh) | `85.71% <0.00%> (-3.18%)` | :arrow_down: |
   | [...g/apache/pinot/common/utils/helix/HelixHelper.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvaGVsaXgvSGVsaXhIZWxwZXIuamF2YQ==) | `44.66% <0.00%> (-3.13%)` | :arrow_down: |
   | [...ller/api/resources/PinotTenantRestletResource.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Bpbm90VGVuYW50UmVzdGxldFJlc291cmNlLmphdmE=) | `4.21% <0.00%> (-0.39%)` | :arrow_down: |
   | [...e/pinot/core/common/datatable/DataTableImplV2.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vZGF0YXRhYmxlL0RhdGFUYWJsZUltcGxWMi5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../transform/function/ConstructFromTextFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9nZW9zcGF0aWFsL3RyYW5zZm9ybS9mdW5jdGlvbi9Db25zdHJ1Y3RGcm9tVGV4dEZ1bmN0aW9uLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...l/transform/function/ConstructFromWKBFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9nZW9zcGF0aWFsL3RyYW5zZm9ybS9mdW5jdGlvbi9Db25zdHJ1Y3RGcm9tV0tCRnVuY3Rpb24uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | ... and [274 more](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [5eee297...e8df187](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] pjpringle commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
pjpringle commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r730186776



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LastWithTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LastWithTimeAggregationFunction extends BaseSingleInputAggregationFunction<LastWithTimePair, Double> {
+  private final ExpressionContext _timeCol;
+
+  public LastWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) {
+    super(dataCol);
+    _timeCol = timeCol;
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.LASTWITHTIME;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    double lastData = Double.POSITIVE_INFINITY;
+    long lastTime = Long.MIN_VALUE;
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = blockTimeSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {

Review comment:
       Are blockVal/TimeSet sorted at all? If they were then we could just take the last value in the array.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733147102



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LastWithTimePair.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
+public class LastWithTimePair implements Comparable<LastWithTimePair> {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] weixiangsun commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
weixiangsun commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r733914596



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +158,45 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new AvgAggregationFunction(firstArgument);
           case MODE:
             return new ModeAggregationFunction(arguments);
+          case LASTWITHTIME:
+            if (arguments.size() > 1) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7584: Add LASTWITHTIME aggregate function support #7315

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#issuecomment-948138425


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7584](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6d60f1e) into [master](https://codecov.io/gh/apache/pinot/commit/5eee297dbea7b3b73ae853f2479a89e9ce017363?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5eee297) will **increase** coverage by `33.08%`.
   > The diff coverage is `85.97%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7584/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #7584       +/-   ##
   =============================================
   + Coverage     32.16%   65.25%   +33.08%     
   - Complexity        0     3998     +3998     
   =============================================
     Files          1513     1529       +16     
     Lines         75557    78075     +2518     
     Branches      11055    11633      +578     
   =============================================
   + Hits          24305    50945    +26640     
   + Misses        49156    23513    -25643     
   - Partials       2096     3617     +1521     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `68.71% <85.97%> (?)` | |
   | unittests2 | `14.53% <0.00%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...tion/function/LastWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0V2l0aFRpbWVBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `47.14% <47.14%> (ø)` | |
   | [...gregation/function/AggregationFunctionFactory.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9BZ2dyZWdhdGlvbkZ1bmN0aW9uRmFjdG9yeS5qYXZh) | `83.33% <60.00%> (+67.17%)` | :arrow_up: |
   | [...inot/segment/local/customobject/ValueLongPair.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9jdXN0b21vYmplY3QvVmFsdWVMb25nUGFpci5qYXZh) | `63.63% <63.63%> (ø)` | |
   | [...ion/LastFloatValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0RmxvYXRWYWx1ZVdpdGhUaW1lQWdncmVnYXRpb25GdW5jdGlvbi5qYXZh) | `97.22% <97.22%> (ø)` | |
   | [...on/LastStringValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0U3RyaW5nVmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `97.22% <97.22%> (ø)` | |
   | [...on/LastDoubleValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0RG91YmxlVmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `97.29% <97.29%> (ø)` | |
   | [...tion/LastLongValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0TG9uZ1ZhbHVlV2l0aFRpbWVBZ2dyZWdhdGlvbkZ1bmN0aW9uLmphdmE=) | `97.29% <97.29%> (ø)` | |
   | [...org/apache/pinot/core/common/ObjectSerDeUtils.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9jb21tb24vT2JqZWN0U2VyRGVVdGlscy5qYXZh) | `90.59% <97.56%> (+43.15%)` | :arrow_up: |
   | [...ction/LastIntValueWithTimeAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9MYXN0SW50VmFsdWVXaXRoVGltZUFnZ3JlZ2F0aW9uRnVuY3Rpb24uamF2YQ==) | `97.72% <97.72%> (ø)` | |
   | [...xt/utils/BrokerRequestToQueryContextConverter.java](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9yZXF1ZXN0L2NvbnRleHQvdXRpbHMvQnJva2VyUmVxdWVzdFRvUXVlcnlDb250ZXh0Q29udmVydGVyLmphdmE=) | `98.21% <100.00%> (+0.08%)` | :arrow_up: |
   | ... and [1293 more](https://codecov.io/gh/apache/pinot/pull/7584/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [5eee297...6d60f1e](https://codecov.io/gh/apache/pinot/pull/7584?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org