You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/12 00:08:08 UTC

[GitHub] [iceberg] rdblue opened a new pull request, #6405: API: Add Aggregate expression evaluation

rdblue opened a new pull request, #6405:
URL: https://github.com/apache/iceberg/pull/6405

   This PR has classes for implementing aggregation expressions in the API module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #6405: API: Add Aggregate expression evaluation

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #6405:
URL: https://github.com/apache/iceberg/pull/6405#issuecomment-1345703757

   @huaxingao, I was looking at #6252 and I wanted to try out implementing aggregation in either the core or API modules so that the majority of the logic could be shared rather than needing to implement it in every processing engine.
   
   Could you please take a look at this and see if it seems reasonable?
   
   The basic idea is to use `BoundAggregate` to do two things:
   1. Extract a value to aggregate in `eval(StructLike)` or `eval(DataFile)`, which is similar to how `eval` is used for other expressions
   2. Create an `Aggregator` that keeps track of the aggregate state
   
   Then this also adds `AggregateEvaluator` that operates on a list of aggregate expressions
   * `aggEval = AggregateEvaluator.create(tableSchema, expressions)` binds the expressions and creates aggregators for each one
   * `aggEval.update(StructLike)` and `aggEval.update(DataFile)` updates each expression aggregator
   * `aggEval.result()` returns a `StructLike` with the aggregated values
   * `aggEval.resultType()` returns a `StructType` for the aggregated values
   
   This is based on #6252, but tries to keep as much logic as possible in core/API. What do you think? Could we incorporate this into #6252?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6405: API: Add Aggregate expression evaluation

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6405:
URL: https://github.com/apache/iceberg/pull/6405#discussion_r1093833996


##########
api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.expressions;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Types;
+
+public class CountNonNull<T> extends CountAggregate<T> {
+  private final int fieldId;
+
+  protected CountNonNull(BoundTerm<T> term) {
+    super(Operation.COUNT, term);
+    Types.NestedField field = term.ref().field();
+    this.fieldId = field.fieldId();
+  }
+
+  @Override
+  protected Long countFor(StructLike row) {
+    return term().eval(row) != null ? 1L : 0L;
+  }
+
+  @Override
+  protected Long countFor(DataFile file) {
+    // NaN value counts were not required in v1 and were included in value counts
+    return safeAdd(safeGet(file.valueCounts(), fieldId), safeGet(file.nanValueCounts(), fieldId, 0L));

Review Comment:
   Yes, you're right. This will include NaN and null values:
   > Map from column id to number of values in the column (including null and NaN values)
   
   That means we should actually not add the NaN count.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on pull request #6405: API: Add Aggregate expression evaluation

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #6405:
URL: https://github.com/apache/iceberg/pull/6405#issuecomment-1356946191

   @rdblue Thank you very much for the PR! The changes are much cleaner and more generic now.  These can be wrapped cleanly in Spark. Once your PR is in, I will make Spark changes on top of your changes. Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on pull request #6405: API: Add Aggregate expression evaluation

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #6405:
URL: https://github.com/apache/iceberg/pull/6405#issuecomment-1347229885

   @rdblue Thank you very much for the PR! I will get your code to my local and work on integrating my changes into yours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on a diff in pull request #6405: API: Add Aggregate expression evaluation

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #6405:
URL: https://github.com/apache/iceberg/pull/6405#discussion_r1051704651


##########
api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java:
##########
@@ -44,4 +57,85 @@ public Type type() {
       return term().type();
     }
   }
+
+  public String describe() {
+    switch (op()) {
+      case COUNT_STAR:
+        return "count(*)";
+      case COUNT:
+        return "count(" + ExpressionUtil.describe(term()) + ")";
+      case MAX:
+        return "max(" + ExpressionUtil.describe(term()) + ")";
+      case MIN:
+        return "min(" + ExpressionUtil.describe(term()) + ")";
+      default:
+        throw new UnsupportedOperationException("Unsupported aggregate type: " + op());
+    }
+  }
+
+  <V> V safeGet(Map<Integer, V> map, int key) {
+    return safeGet(map, key, null);
+  }
+
+  <V> V safeGet(Map<Integer, V> map, int key, V defaultValue) {
+    if (map != null) {
+      return map.getOrDefault(key, defaultValue);
+    }
+
+    return null;
+  }
+
+  interface Aggregator<R> {
+    void update(StructLike struct);
+
+    void update(DataFile file);
+
+    R result();
+  }
+
+  abstract static class NullSafeAggregator<T, R> implements Aggregator<R> {
+    private final BoundAggregate<T, R> aggregate;
+    private boolean isNull = false;
+
+    NullSafeAggregator(BoundAggregate<T, R> aggregate) {
+      this.aggregate = aggregate;
+    }
+
+    protected abstract void update(R value);
+
+    protected abstract R current();
+
+    @Override
+    public void update(StructLike struct) {
+      if (!isNull) {
+        R value = aggregate.eval(struct);
+        if (value == null) {
+          this.isNull = true;
+        } else {
+          update(value);
+        }
+      }
+    }
+
+    @Override
+    public void update(DataFile file) {
+      if (!isNull) {
+        R value = aggregate.eval(file);
+        if (value == null) {
+          this.isNull = true;
+        } else {
+          update(value);
+        }
+      }
+    }
+
+    @Override
+    public R result() {
+      if (isNull) {
+        return null;
+      }
+
+      return result();

Review Comment:
   Do you mean `return current();`?



##########
api/src/main/java/org/apache/iceberg/expressions/CountNonNull.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.expressions;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.types.Types;
+
+public class CountNonNull<T> extends CountAggregate<T> {
+  private final int fieldId;
+
+  protected CountNonNull(BoundTerm<T> term) {
+    super(Operation.COUNT, term);
+    Types.NestedField field = term.ref().field();
+    this.fieldId = field.fieldId();
+  }
+
+  @Override
+  protected Long countFor(StructLike row) {
+    return term().eval(row) != null ? 1L : 0L;
+  }
+
+  @Override
+  protected Long countFor(DataFile file) {
+    // NaN value counts were not required in v1 and were included in value counts
+    return safeAdd(safeGet(file.valueCounts(), fieldId), safeGet(file.nanValueCounts(), fieldId, 0L));

Review Comment:
   Shall we subtract the `nullValueCounts`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zinking commented on a diff in pull request #6405: API: Add Aggregate expression evaluation

Posted by GitBox <gi...@apache.org>.
zinking commented on code in PR #6405:
URL: https://github.com/apache/iceberg/pull/6405#discussion_r1045358208


##########
api/src/main/java/org/apache/iceberg/expressions/BoundAggregate.java:
##########
@@ -44,4 +57,85 @@ public Type type() {
       return term().type();
     }
   }
+
+  public String describe() {
+    switch (op()) {
+      case COUNT_STAR:
+        return "count(*)";
+      case COUNT:
+        return "count(" + ExpressionUtil.describe(term()) + ")";
+      case MAX:
+        return "max(" + ExpressionUtil.describe(term()) + ")";
+      case MIN:
+        return "min(" + ExpressionUtil.describe(term()) + ")";
+      default:
+        throw new UnsupportedOperationException("Unsupported aggregate type: " + op());
+    }
+  }
+
+  <V> V safeGet(Map<Integer, V> map, int key) {
+    return safeGet(map, key, null);
+  }
+
+  <V> V safeGet(Map<Integer, V> map, int key, V defaultValue) {

Review Comment:
   should this belong to some util class or possibly null isnt allowed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6405: API: Add Aggregate expression evaluation

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6405:
URL: https://github.com/apache/iceberg/pull/6405#discussion_r1046618283


##########
api/src/main/java/org/apache/iceberg/expressions/CountStar.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.expressions;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.StructLike;
+
+public class CountStar<T> extends CountAggregate<T> {
+  protected CountStar(BoundTerm<T> term) {
+    super(Operation.COUNT_STAR, term);
+  }
+
+  @Override
+  protected Long countFor(StructLike row) {
+    return 1L;
+  }
+
+  @Override
+  protected Long countFor(DataFile file) {
+    long count = file.recordCount();
+    if (count < 0) {
+      return null;
+    }

Review Comment:
   Curious when would this ever be negative? or is this just for this logic to be defensive against bad metadata? 



##########
api/src/main/java/org/apache/iceberg/expressions/AggregateEvaluator.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.expressions;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+
+public class AggregateEvaluator {
+  public static AggregateEvaluator create(Schema schema, Expression... aggregates) {
+    return create(schema, aggregates);
+  }
+
+  public static AggregateEvaluator create(Schema schema, List<Expression> aggregates) {
+    return create(schema.asStruct(), aggregates);
+  }
+
+  private static AggregateEvaluator create(Types.StructType struct, List<Expression> aggregates) {
+    List<BoundAggregate<?, ?>> boundAggregates =
+        aggregates.stream()
+            .map(expr -> Binder.bind(struct, expr))
+            .map(bound -> (BoundAggregate<?, ?>) bound)
+            .collect(Collectors.toList());
+
+    return new AggregateEvaluator(boundAggregates);
+  }
+
+  private final List<BoundAggregate.Aggregator<?>> aggregators;
+  private final Types.StructType resultType;
+
+  private AggregateEvaluator(List<BoundAggregate<?, ?>> aggregates) {
+    ImmutableList.Builder<BoundAggregate.Aggregator<?>> aggregatorsBuilder =
+        ImmutableList.builder();
+    List<Types.NestedField> resultFields = Lists.newArrayList();
+    for (int pos = 0; pos < aggregates.size(); pos += 1) {
+      BoundAggregate<?, ?> aggregate = aggregates.get(pos);
+      aggregatorsBuilder.add(aggregates.get(pos).newAggregator());

Review Comment:
   I guess here we could also reuse `aggregate`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6405: API: Add Aggregate expression evaluation

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6405:
URL: https://github.com/apache/iceberg/pull/6405#discussion_r1048806645


##########
api/src/main/java/org/apache/iceberg/expressions/CountStar.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.expressions;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.StructLike;
+
+public class CountStar<T> extends CountAggregate<T> {
+  protected CountStar(BoundTerm<T> term) {
+    super(Operation.COUNT_STAR, term);
+  }
+
+  @Override
+  protected Long countFor(StructLike row) {
+    return 1L;
+  }
+
+  @Override
+  protected Long countFor(DataFile file) {
+    long count = file.recordCount();
+    if (count < 0) {
+      return null;
+    }

Review Comment:
   Some imported Avro files had incorrect metadata several versions ago. I don't think it is widespread, but it is good to handle it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org