You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/10/17 00:35:42 UTC

[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7584: Add LASTWITHTIME aggregate function support #7315

Jackie-Jiang commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r730144239



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LastWithTimePair.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
+public class LastWithTimePair implements Comparable<LastWithTimePair> {
+  private double _data;
+  private long _time;
+
+  public LastWithTimePair(double data, long time) {
+    _data = data;
+    _time = time;
+  }
+
+  public void apply(double data, long time) {

Review comment:
       Handle the merge logic within the function, and we can make the Pair itself immutable

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LastWithTimePair.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
+public class LastWithTimePair implements Comparable<LastWithTimePair> {
+  private double _data;
+  private long _time;
+
+  public LastWithTimePair(double data, long time) {
+    _data = data;
+    _time = time;
+  }
+
+  public void apply(double data, long time) {
+    if (time >= _time) {
+      _data = data;
+      _time = time;
+    }
+  }
+
+  public void apply(@Nonnull LastWithTimePair lastWithTimePair) {

Review comment:
       (Convention) we don't usually put nonnull annotation, but only the nullable because they look quite similar, and assume everything not annotated as nonnull. 

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/LastWithTimePair.java
##########
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.customobject;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
+
+public class LastWithTimePair implements Comparable<LastWithTimePair> {

Review comment:
       Let's make it `ValueTimePair` where value can be of different type (declare value as `Comparable`)? The pari doesn't need to be `Comparable`, and we can handle the merge logic within the function. The `FIRST` function can reuse the same pair.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.LastWithTimePair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LastWithTimeAggregationFunction extends BaseSingleInputAggregationFunction<LastWithTimePair, Double> {
+  private final ExpressionContext _timeCol;
+
+  public LastWithTimeAggregationFunction(ExpressionContext dataCol, ExpressionContext timeCol) {
+    super(dataCol);
+    _timeCol = timeCol;
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return Arrays.asList(_expression, _timeCol);
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.LASTWITHTIME;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    double lastData = Double.POSITIVE_INFINITY;
+    long lastTime = Long.MIN_VALUE;
+
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = blockTimeSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        double data = doubleValues[i];
+        long time = timeValues[i];
+        if (time >= lastTime) {
+          lastTime = time;
+          lastData = data;
+        }
+      }
+    } else {
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        LastWithTimePair lastWithTimePair = ObjectSerDeUtils.LAST_WITH_TIME_PAIR_SER_DE.deserialize(bytesValues[i]);
+        double data = lastWithTimePair.getData();
+        long time = lastWithTimePair.getTime();
+        if (time >= lastTime) {
+          lastTime = time;
+          lastData = data;
+        }
+      }
+    }
+    setAggregationResult(aggregationResultHolder, lastData, lastTime);
+  }
+
+  protected void setAggregationResult(AggregationResultHolder aggregationResultHolder, double data, long time) {
+    LastWithTimePair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null) {
+      aggregationResultHolder.setValue(new LastWithTimePair(data, time));
+    } else {
+      lastWithTimePair.apply(data, time);
+    }
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = timeValSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        double data = doubleValues[i];
+        long time = timeValues[i];
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, data, time);
+      }
+    } else {
+      // Serialized LastPair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        LastWithTimePair lastWithTimePair = ObjectSerDeUtils.LAST_WITH_TIME_PAIR_SER_DE.deserialize(bytesValues[i]);
+        setGroupByResult(groupKeyArray[i], groupByResultHolder, lastWithTimePair.getData(), lastWithTimePair.getTime());
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+    if (blockValSet.getValueType() != DataType.BYTES) {
+      double[] doubleValues = blockValSet.getDoubleValuesSV();
+      long[] timeValues = timeValSet.getLongValuesSV();
+      for (int i = 0; i < length; i++) {
+        double value = doubleValues[i];
+        long time = timeValues[i];
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, value, time);
+        }
+      }
+    } else {
+      // Serialized LastWithTimePair
+      byte[][] bytesValues = blockValSet.getBytesValuesSV();
+      for (int i = 0; i < length; i++) {
+        LastWithTimePair lastWithTimePair = ObjectSerDeUtils.LAST_WITH_TIME_PAIR_SER_DE.deserialize(bytesValues[i]);
+        double data = lastWithTimePair.getData();
+        long time = lastWithTimePair.getTime();
+        for (int groupKey : groupKeysArray[i]) {
+          setGroupByResult(groupKey, groupByResultHolder, data, time);
+        }
+      }
+    }
+  }
+
+  protected void setGroupByResult(int groupKey, GroupByResultHolder groupByResultHolder, double data, long time) {
+    LastWithTimePair lastWithTimePair = groupByResultHolder.getResult(groupKey);
+    if (lastWithTimePair == null) {
+      groupByResultHolder.setValueForKey(groupKey, new LastWithTimePair(data, time));
+    } else {
+      lastWithTimePair.apply(data, time);
+    }
+  }
+
+  @Override
+  public LastWithTimePair extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    LastWithTimePair lastWithTimePair = aggregationResultHolder.getResult();
+    if (lastWithTimePair == null) {
+      return new LastWithTimePair(Double.POSITIVE_INFINITY, Long.MIN_VALUE);

Review comment:
       Put a constant for the default, and I feel `Double.NaN` might be a better default?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
##########
@@ -42,7 +42,6 @@
 
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<Set, Integer> {
-

Review comment:
       (nit) Seems unrelated?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org