You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/19 05:56:49 UTC

[GitHub] [iceberg] yyanyy opened a new pull request #1963: Track metrics in Avro value writers

yyanyy opened a new pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963


   This change is a smaller PR broken down from #1935. 
   
   This change adds field id to constructors of Avro primitive value writers, and make these writers to track stats such as value count, min and max, and expose a `metrics` method that could be called to collect `FieldMetrics`. However nothing is calling these method yet. 
   This change doesn't have any test, and tests will be included in the next PR when end to end integration is set up. 
   
   Please note: regarding change to the signature of `FieldMetrics`, the alternative would be to keep `ByteBuffer` as the return value for lower/upper bound of `FieldMetrics` and ingest each field's metrics mode to each leaf value writer during construction, so that when collecting metrics from these writers, truncation and conversion to byte buffer could happen. I think it's doable but it would touch a lot of methods' signatures, including adding metric mode to the constructor of every leaf writer, and adding metrics config to every datum writer (e.g. `DataWriter`, `GenericAppenderFactory`), but it does avoid skip computing min/max for fields that don't need them. Please let me know if you are interested, and I'll post a new commit to this PR so that the differences in these two implementations could be compared.  
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r582545871



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +561,153 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends ComparableWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      if (datum == null) {
+        nullValueCount++;
+      } else if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {

Review comment:
       Flink's StringData writer also extends it. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579889665



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java
##########
@@ -161,4 +186,17 @@ public T map(P sMap, Schema map, T value) {
   public T primitive(P type, Schema primitive) {
     return null;
   }
+
+  // ---------------------------------- Helpers ---------------------------------------------
+
+  private Deque<String> fieldNames = Lists.newLinkedList();
+  private Deque<Schema> parentSchemas = Lists.newLinkedList();

Review comment:
       What about creating a fake `Schema.Field` to pass to the before/after method instead? Another alternative is to pass the field information to the method, like this:
   
   ```java
     public beforeField(int fieldId, String name, Schema type);
     public afterField(int fieldId, String name, Schema type);
     public beforeListElement(int elementId, Schema elementType);
     public beforeMapKey(int keyId, Schema keyType);
     public beforeMapValue(int valueId, Schema valueType);
   ```
   
   I think some variation on this would be better. We want to avoid keeping additional state in all visitors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r569624421



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -369,17 +393,42 @@ private OptionWriter(int nullIndex, ValueWriter<T> valueWriter) {
         throw new IllegalArgumentException("Invalid option index: " + nullIndex);
       }
       this.valueWriter = valueWriter;
+      this.type = type;
+      this.nullValueCount = 0;
     }
 
     @Override
     public void write(T option, Encoder encoder) throws IOException {
       if (option == null) {
         encoder.writeIndex(nullIndex);
+        nullValueCount++;
       } else {
         encoder.writeIndex(valueIndex);
         valueWriter.write(option, encoder);
       }
     }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      if (AvroSchemaUtil.isMetricSupportedType(type)) {
+        return mergeNullCountIntoMetric();
+      } else {
+        return valueWriter.metrics();
+      }
+    }
+
+    private Stream<FieldMetrics> mergeNullCountIntoMetric() {
+      List<FieldMetrics> fieldMetricsFromWriter = valueWriter.metrics().collect(Collectors.toList());
+      Preconditions.checkState(fieldMetricsFromWriter.size() == 1,
+          "Optional field for type % shouldn't vend more than one field metrics", type);

Review comment:
       Typo: should be `%s` instead of `%`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r569620243



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java
##########
@@ -161,4 +186,17 @@ public T map(P sMap, Schema map, T value) {
   public T primitive(P type, Schema primitive) {
     return null;
   }
+
+  // ---------------------------------- Helpers ---------------------------------------------
+
+  private Deque<String> fieldNames = Lists.newLinkedList();
+  private Deque<Schema> parentSchemas = Lists.newLinkedList();

Review comment:
       Instead of updating all visitors, why not add extra callbacks like the visitors for Iceberg schemas? I think that supporting `beforeField` and `afterField` would be a better way to handle this than passing the parent and name around. The implementation to get a field's ID from its parent and name seems a bit awkward compared to adding an ID stack in one visitor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#issuecomment-892920161


   Thanks, @yyanyy! No rush, I just wanted to check in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579888939



##########
File path: core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java
##########
@@ -19,16 +19,14 @@
 
 package org.apache.iceberg;
 
-import java.nio.ByteBuffer;
-
 /**
  * Iceberg internally tracked field level metrics, used by Parquet and ORC writers only.
  * <p>
  * Parquet/ORC keeps track of most metrics in file statistics, and only NaN counter is actually tracked by writers.
  * This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing
  * exceptions when they are accessed.
  */
-public class FloatFieldMetrics extends FieldMetrics {
+public class FloatFieldMetrics extends FieldMetrics<Object> {

Review comment:
       Number works for me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#issuecomment-785617395


   Thank you @rdblue for reviewing this PR again! I think I have addressed all the feedback. Since the visitor change is non-trivial and the original change didn't contain tests, in order to ensure the new code with before/afterField pattern doesn't impact normal avro writes, I added all the necessary tests to this PR (which means that #1935 will be done if the current changes are merged), but please feel free to suggest removing some classes for an easier review.
   
   One thing I'd like to mention is a behavior difference among different file types in handling optional struct field with required and optional sub fields. I think it might be fine for now since I think we don't use those statistics, but just want to capture the current status here:
   
   for example, if the schema is `someField, optional struct<required int1, optional int2>`, and there are 50 records with this schema in total, within which 6 records have null struct, and 3 records have struct but null int2, then each file format will produce the following count info:
   
   |         | valueCount | nullCount |
   |---------|------------|-----------|
   | Parquet | int1: 50, int2: 50       | int1:  6,  int2: 9    |
   | Orc     |int1: 44, int2: 50            | int1: 0, int2: 9          |
   | Avro    |int1: 44, int2: 44            |int1: 0, int2: 3          |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579890007



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
##########
@@ -244,6 +245,28 @@ static boolean hasProperty(Schema schema, String propertyName) {
     return schema.getObjectProp(propertyName) != null;
   }
 
+  public static int fieldId(Schema currentSchema, Schema parentSchema, Supplier<String> fieldNameGetter) {

Review comment:
       I'd like to avoid needing this method because of the strange input types. I think those show that the way we're traversing the schema isn't quite right, which is why this is complicated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579895706



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +561,153 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends ComparableWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      if (datum == null) {
+        nullValueCount++;
+      } else if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {
+    public MetricsAwareStringWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to string to allow upper/lower bound truncation when gathering metrics,
+      // as in different implementations there's no guarantee that input to string writer will be char sequence
+      return metrics(Object::toString);
+    }
+  }
+
+  private abstract static class MetricsAwareByteArrayWriter extends MetricsAwareWriter<byte[]> {
+    MetricsAwareByteArrayWriter(int id) {
+      super(id, Comparators.unsignedByteArrays());
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics.
+      return metrics(ByteBuffer::wrap);
+    }
+  }
+
+  public abstract static class ComparableWriter<T extends Comparable<T>> extends MetricsAwareWriter<T> {
+    public ComparableWriter(int id) {
+      super(id, Comparable::compareTo);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing.
+   *
+   * @param <T> Input type
+   */
+  public abstract static class MetricsAwareWriter<T> extends MetricsAwareTransformWriter<T, T> {
+    public MetricsAwareWriter(int id, Comparator<T> comparator) {
+      super(id, comparator, Function.identity());
+    }
+
+    /**
+     * Helper class to transform the input type when collecting metrics.
+     * The transform function converts the stats information from the specific type that the underlying writer
+     * understands to a more general type that could be transformed to binary following iceberg single-value
+     * serialization spec.
+     *
+     * @param func tranformation function
+     * @return a stream of field metrics with bounds converted by the given transformation
+     */
+    protected Stream<FieldMetrics> metrics(Function<T, ?> func) {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, 0,
+          updateBound(min, func), updateBound(max, func)));
+    }
+
+    private <T3, T4> T4 updateBound(T3 bound, Function<T3, T4> func) {
+      return bound == null ? null : func.apply(bound);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing, and accepts a
+   * transformation in its constructor.
+   * The transformation will apply to the input data to produce the type that the underlying writer accepts.
+   * Stats will also be tracked with the type after transformation.
+   *
+   * @param <T1> Input type
+   * @param <T2> Type after transformation
+   */
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  public abstract static class MetricsAwareTransformWriter<T1, T2> implements ValueWriter<T1> {
+    protected final int id;
+    protected long valueCount;
+    protected long nullValueCount;
+    protected T2 max;
+    protected T2 min;
+    protected final Function<T1, T2> transformation;
+
+    private final Comparator<T2> comparator;
+
+    public MetricsAwareTransformWriter(int id, Comparator<T2> comparator, Function<T1, T2> func) {
+      this.id = id;
+      this.comparator = comparator;
+      this.transformation = func;
+    }
+
+    @Override
+    public void write(T1 datum, Encoder encoder) throws IOException {
+      valueCount++;
+      if (datum == null) {
+        nullValueCount++;
+        writeVal(null, encoder);
+
+      } else {
+        T2 transformedDatum = transformation.apply(datum);
+        if (max == null || comparator.compare(transformedDatum, max) > 0) {
+          max = transformedDatum;
+        }
+        if (min == null || comparator.compare(transformedDatum, min) < 0) {
+          min = transformedDatum;
+        }
+        writeVal(transformedDatum, encoder);

Review comment:
       After removing the null handling from this method, this is the main block of code provided by this class. I think that the class structure here tries too hard to avoid duplication of these 8 lines and ends up causing an unnecessary performance slow down.
   
   Most of the subclasses don't actually provide a transform, so it doesn't make a lot of sense to provide one. In addition, the cases that do provide a transform only produce `int` and `long` values. Using this inheritance structure, all writers will apply a function and the `int` and `long` cases will box the result of that function call.
   
   I think separating this into `StoredAsIntwriter`, `StoredAsLongWriter`, and `MetricsAwareWriter` would duplicate these lines, but avoid the function call and boxing for a lot of cases. It would also avoid needing to implement `writeVal(Integer, Encoder)` in every subclass. Instead, you'd need to add `int convert(T)` to convert to the int value before writing to the encoder directly in `write`.
   
   Could you update to use that structure?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579892821



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -135,145 +150,145 @@ private NullWriter() {
 
     @Override
     public void write(Void ignored, Encoder encoder) throws IOException {
-      encoder.writeNull();
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");
     }
-  }
 
-  private static class BooleanWriter implements ValueWriter<Boolean> {
-    private static final BooleanWriter INSTANCE = new BooleanWriter();
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");
+    }
+  }
 
-    private BooleanWriter() {
+  private static class BooleanWriter extends ComparableWriter<Boolean> {
+    private BooleanWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Boolean bool, Encoder encoder) throws IOException {
+    protected void writeVal(Boolean bool, Encoder encoder) throws IOException {
       encoder.writeBoolean(bool);
     }
   }
 
-  private static class ByteToIntegerWriter implements ValueWriter<Byte> {
-    private static final ByteToIntegerWriter INSTANCE = new ByteToIntegerWriter();
-
-    private ByteToIntegerWriter() {
+  private static class ByteToIntegerWriter extends MetricsAwareTransformWriter<Byte, Integer> {
+    private ByteToIntegerWriter(int id) {
+      super(id, Integer::compareTo, Byte::intValue);
     }
 
     @Override
-    public void write(Byte b, Encoder encoder) throws IOException {
-      encoder.writeInt(b.intValue());
+    protected void writeVal(Integer intVal, Encoder encoder) throws IOException {
+      encoder.writeInt(intVal);
     }
   }
 
-  private static class ShortToIntegerWriter implements ValueWriter<Short> {
-    private static final ShortToIntegerWriter INSTANCE = new ShortToIntegerWriter();
-
-    private ShortToIntegerWriter() {
+  private static class ShortToIntegerWriter extends MetricsAwareTransformWriter<Short, Integer> {
+    private ShortToIntegerWriter(int id) {
+      super(id, Integer::compareTo, Short::intValue);
     }
 
     @Override
-    public void write(Short s, Encoder encoder) throws IOException {
-      encoder.writeInt(s.intValue());
+    protected void writeVal(Integer intValue, Encoder encoder) throws IOException {
+      encoder.writeInt(intValue);
     }
   }
 
-  private static class IntegerWriter implements ValueWriter<Integer> {
-    private static final IntegerWriter INSTANCE = new IntegerWriter();
-
-    private IntegerWriter() {
+  private static class IntegerWriter extends ComparableWriter<Integer> {
+    private IntegerWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Integer i, Encoder encoder) throws IOException {
+    protected void writeVal(Integer i, Encoder encoder) throws IOException {
       encoder.writeInt(i);
     }
   }
 
-  private static class LongWriter implements ValueWriter<Long> {
-    private static final LongWriter INSTANCE = new LongWriter();
-
-    private LongWriter() {
+  private static class LongWriter extends ComparableWriter<Long> {
+    private LongWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Long l, Encoder encoder) throws IOException {
+    protected void writeVal(Long l, Encoder encoder) throws IOException {
       encoder.writeLong(l);
     }
   }
 
-  private static class FloatWriter implements ValueWriter<Float> {
-    private static final FloatWriter INSTANCE = new FloatWriter();
-
-    private FloatWriter() {
+  private static class FloatWriter extends FloatingPointWriter<Float> {
+    private FloatWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Float f, Encoder encoder) throws IOException {
+    protected void writeVal(Float f, Encoder encoder) throws IOException {
       encoder.writeFloat(f);
     }
   }
 
-  private static class DoubleWriter implements ValueWriter<Double> {
-    private static final DoubleWriter INSTANCE = new DoubleWriter();
-
-    private DoubleWriter() {
+  private static class DoubleWriter extends FloatingPointWriter<Double> {
+    private DoubleWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Double d, Encoder encoder) throws IOException {
+    protected void writeVal(Double d, Encoder encoder) throws IOException {
       encoder.writeDouble(d);
     }
   }
 
-  private static class StringWriter implements ValueWriter<Object> {
-    private static final StringWriter INSTANCE = new StringWriter();
-
-    private StringWriter() {
+  private static class StringWriter extends MetricsAwareWriter<CharSequence> {
+    private StringWriter(int id) {
+      super(id, Comparators.charSequences());
     }
 
     @Override
-    public void write(Object s, Encoder encoder) throws IOException {
+    public void write(CharSequence s, Encoder encoder) throws IOException {
       // use getBytes because it may return the backing byte array if available.
       // otherwise, it copies to a new byte array, which is still cheaper than Avro
       // calling toString, which incurs encoding costs
       if (s instanceof Utf8) {
-        encoder.writeString((Utf8) s);
+        super.write(s, encoder);
       } else if (s instanceof String) {
-        encoder.writeString(new Utf8((String) s));
+        super.write(new Utf8((String) s), encoder);
       } else if (s == null) {
         throw new IllegalArgumentException("Cannot write null to required string column");
       } else {
         throw new IllegalArgumentException(
             "Cannot write unknown string type: " + s.getClass().getName() + ": " + s.toString());
       }
     }
-  }
 
-  private static class Utf8Writer implements ValueWriter<Utf8> {
-    private static final Utf8Writer INSTANCE = new Utf8Writer();
+    @Override
+    protected void writeVal(CharSequence s, Encoder encoder) throws IOException {

Review comment:
       Nevermind, `write` delegates to `super.write`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579898254



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -358,8 +380,10 @@ public void write(BigDecimal decimal, Encoder encoder) throws IOException {
     private final int nullIndex;
     private final int valueIndex;
     private final ValueWriter<T> valueWriter;
+    private final Schema.Type type;
+    private long nullValueCount;
 
-    private OptionWriter(int nullIndex, ValueWriter<T> valueWriter) {
+    private OptionWriter(int nullIndex, ValueWriter<T> valueWriter, Schema.Type type) {

Review comment:
       I think that we should not pass the type here. Instead, let's have a `MetricsOptionWriter` and a non-metrics version and choose which one to use ahead of time. That way, we don't keep a count that won't be used. There's no need to check whether the null count should be used when `metrics` is called. That can be determined ahead of time.
   
   If we do that, then there is no need to pass the type here or into the `option` method. We can either add a boolean (`collectMetrics`) or have a separate factory method. I think that would be a bit cleaner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579891808



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +561,153 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends ComparableWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      if (datum == null) {
+        nullValueCount++;
+      } else if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {
+    public MetricsAwareStringWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to string to allow upper/lower bound truncation when gathering metrics,
+      // as in different implementations there's no guarantee that input to string writer will be char sequence
+      return metrics(Object::toString);
+    }
+  }
+
+  private abstract static class MetricsAwareByteArrayWriter extends MetricsAwareWriter<byte[]> {
+    MetricsAwareByteArrayWriter(int id) {
+      super(id, Comparators.unsignedByteArrays());
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics.
+      return metrics(ByteBuffer::wrap);
+    }
+  }
+
+  public abstract static class ComparableWriter<T extends Comparable<T>> extends MetricsAwareWriter<T> {
+    public ComparableWriter(int id) {
+      super(id, Comparable::compareTo);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing.
+   *
+   * @param <T> Input type
+   */
+  public abstract static class MetricsAwareWriter<T> extends MetricsAwareTransformWriter<T, T> {
+    public MetricsAwareWriter(int id, Comparator<T> comparator) {
+      super(id, comparator, Function.identity());
+    }
+
+    /**
+     * Helper class to transform the input type when collecting metrics.
+     * The transform function converts the stats information from the specific type that the underlying writer
+     * understands to a more general type that could be transformed to binary following iceberg single-value
+     * serialization spec.
+     *
+     * @param func tranformation function
+     * @return a stream of field metrics with bounds converted by the given transformation
+     */
+    protected Stream<FieldMetrics> metrics(Function<T, ?> func) {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, 0,
+          updateBound(min, func), updateBound(max, func)));
+    }
+
+    private <T3, T4> T4 updateBound(T3 bound, Function<T3, T4> func) {
+      return bound == null ? null : func.apply(bound);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing, and accepts a
+   * transformation in its constructor.
+   * The transformation will apply to the input data to produce the type that the underlying writer accepts.
+   * Stats will also be tracked with the type after transformation.
+   *
+   * @param <T1> Input type
+   * @param <T2> Type after transformation
+   */
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  public abstract static class MetricsAwareTransformWriter<T1, T2> implements ValueWriter<T1> {

Review comment:
       We typically use `S` and `T` for input and output types in other transform classes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r653862129



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -137,143 +156,143 @@ private NullWriter() {
     public void write(Void ignored, Encoder encoder) throws IOException {
       encoder.writeNull();
     }
-  }
 
-  private static class BooleanWriter implements ValueWriter<Boolean> {
-    private static final BooleanWriter INSTANCE = new BooleanWriter();
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.empty();
+    }
+  }
 
-    private BooleanWriter() {
+  private static class BooleanWriter extends ComparableWriter<Boolean> {
+    private BooleanWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Boolean bool, Encoder encoder) throws IOException {
+    protected void writeVal(Boolean bool, Encoder encoder) throws IOException {
       encoder.writeBoolean(bool);
     }
   }
 
-  private static class ByteToIntegerWriter implements ValueWriter<Byte> {
-    private static final ByteToIntegerWriter INSTANCE = new ByteToIntegerWriter();
-
-    private ByteToIntegerWriter() {
+  private static class ByteToIntegerWriter extends StoredAsIntWriter<Byte> {
+    private ByteToIntegerWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Byte b, Encoder encoder) throws IOException {
-      encoder.writeInt(b.intValue());
+    protected int convert(Byte from) {
+      return from.intValue();
     }
   }
 
-  private static class ShortToIntegerWriter implements ValueWriter<Short> {
-    private static final ShortToIntegerWriter INSTANCE = new ShortToIntegerWriter();
-
-    private ShortToIntegerWriter() {
+  private static class ShortToIntegerWriter extends StoredAsIntWriter<Short> {
+    private ShortToIntegerWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Short s, Encoder encoder) throws IOException {
-      encoder.writeInt(s.intValue());
+    protected int convert(Short from) {
+      return from.intValue();
     }
   }
 
-  private static class IntegerWriter implements ValueWriter<Integer> {
-    private static final IntegerWriter INSTANCE = new IntegerWriter();
-
-    private IntegerWriter() {
+  private static class IntegerWriter extends ComparableWriter<Integer> {
+    private IntegerWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Integer i, Encoder encoder) throws IOException {
+    protected void writeVal(Integer i, Encoder encoder) throws IOException {
       encoder.writeInt(i);
     }
   }
 
-  private static class LongWriter implements ValueWriter<Long> {
-    private static final LongWriter INSTANCE = new LongWriter();
-
-    private LongWriter() {
+  private static class LongWriter extends ComparableWriter<Long> {

Review comment:
       I would expect `LongWriter` to extend `StoredAsLongWriter` instead of `ComparableWriter`. Same for `IntegerWriter`. Could you update those?

##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriter.java
##########
@@ -27,7 +27,5 @@
 public interface ValueWriter<D> {
   void write(D datum, Encoder encoder) throws IOException;
 
-  default Stream<FieldMetrics> metrics() {
-    return Stream.empty(); // TODO will populate in following PRs
-  }
+  Stream<FieldMetrics> metrics();

Review comment:
       `FieldMetrics` is parameterized, but this is a bare reference. Could you update it? I think it should be `FieldMetrics<?>` since the metrics are not necessarily for the written value type, `D`.

##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +576,215 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends MetricsAwareWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      // pass null to comparator since we override write() method that uses comparator in this class.
+      super(id, null);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      // null value should be handled by option writer, thus assume datum will not be null here.
+      if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, 0, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {
+    public MetricsAwareStringWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to string to allow upper/lower bound truncation when gathering metrics,
+      // as in different implementations there's no guarantee that input to string writer will be char sequence
+      return metrics(Object::toString);
+    }
+  }
+
+  private abstract static class MetricsAwareByteArrayWriter extends MetricsAwareWriter<byte[]> {
+    MetricsAwareByteArrayWriter(int id) {
+      super(id, Comparators.unsignedByteArrays());
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics.
+      return metrics(ByteBuffer::wrap);
+    }
+  }
+
+  public abstract static class ComparableWriter<T extends Comparable<T>> extends MetricsAwareWriter<T> {
+    public ComparableWriter(int id) {
+      super(id, Comparable::compareTo);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing.
+   *
+   * @param <T> Input type
+   */
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  public abstract static class MetricsAwareWriter<T> implements ValueWriter<T> {
+    protected final int id;
+    protected long valueCount;
+    protected T max;
+    protected T min;
+
+    private final Comparator<T> comparator;
+
+    public MetricsAwareWriter(int id, Comparator<T> comparator) {
+      this.id = id;
+      this.comparator = comparator;
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      // null value should be handled by option writer, thus assume datum will not be null here.
+      if (max == null || comparator.compare(datum, max) > 0) {
+        max = datum;
+      }
+
+      if (min == null || comparator.compare(datum, min) < 0) {
+        min = datum;
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    protected abstract void writeVal(T datum, Encoder encoder) throws IOException;

Review comment:
       I would probably name this `encode` rather than `writeVal` so that there is less confusion with the `write` method.

##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +576,215 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends MetricsAwareWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      // pass null to comparator since we override write() method that uses comparator in this class.
+      super(id, null);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      // null value should be handled by option writer, thus assume datum will not be null here.
+      if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, 0, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {
+    public MetricsAwareStringWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to string to allow upper/lower bound truncation when gathering metrics,
+      // as in different implementations there's no guarantee that input to string writer will be char sequence
+      return metrics(Object::toString);
+    }
+  }
+
+  private abstract static class MetricsAwareByteArrayWriter extends MetricsAwareWriter<byte[]> {
+    MetricsAwareByteArrayWriter(int id) {
+      super(id, Comparators.unsignedByteArrays());
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics.
+      return metrics(ByteBuffer::wrap);
+    }
+  }
+
+  public abstract static class ComparableWriter<T extends Comparable<T>> extends MetricsAwareWriter<T> {
+    public ComparableWriter(int id) {
+      super(id, Comparable::compareTo);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing.
+   *
+   * @param <T> Input type
+   */
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  public abstract static class MetricsAwareWriter<T> implements ValueWriter<T> {
+    protected final int id;
+    protected long valueCount;
+    protected T max;
+    protected T min;
+
+    private final Comparator<T> comparator;
+
+    public MetricsAwareWriter(int id, Comparator<T> comparator) {
+      this.id = id;
+      this.comparator = comparator;
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      // null value should be handled by option writer, thus assume datum will not be null here.
+      if (max == null || comparator.compare(datum, max) > 0) {
+        max = datum;
+      }
+
+      if (min == null || comparator.compare(datum, min) < 0) {
+        min = datum;
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    protected abstract void writeVal(T datum, Encoder encoder) throws IOException;
+
+    @Override
+    public Stream<FieldMetrics> metrics() {

Review comment:
       I'd like to fix all of the references that don't parameterize `FieldMetrics`. I think they should be `FieldMetrics<?>`.

##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -274,7 +282,11 @@ public void testMetricsForNestedStructFields() throws IOException {
     assertBounds(6, BinaryType.get(),
         ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
     assertCounts(7, 1L, 0L, 1L, metrics);
-    assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics);
+    if (fileFormat() == FileFormat.AVRO) {

Review comment:
       Is this needed if #2464 goes in first?

##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -643,4 +747,10 @@ protected void assertCounts(int fieldId, Long valueCount, Long nullValueCount, L
         upperBounds.containsKey(fieldId) ? fromByteBuffer(type, upperBounds.get(fieldId)) : null);
   }
 
+  private void assertNonNullColumnSizes(Metrics metrics) {
+    if (fileFormat() != FileFormat.AVRO) {
+      Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull));
+    }

Review comment:
       Can you add an assertion for Avro? I think that the column sizes map should be null, is that correct?

##########
File path: data/src/test/java/org/apache/iceberg/avro/TestGenericAvroMetrics.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class TestGenericAvroMetrics extends TestAvroMetrics {
+
+  protected Metrics getMetrics(Schema schema, OutputFile file, Map<String, String> properties,
+                                        MetricsConfig metricsConfig, Record... records) throws IOException {

Review comment:
       Nit: indentation is off.

##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -45,68 +55,72 @@ private ValueWriters() {
     return NullWriter.INSTANCE;
   }
 
-  public static ValueWriter<Boolean> booleans() {
-    return BooleanWriter.INSTANCE;
+  public static ValueWriter<Boolean> booleans(int id) {
+    return new BooleanWriter(id);
   }
 
-  public static ValueWriter<Byte> tinyints() {
-    return ByteToIntegerWriter.INSTANCE;
+  public static ValueWriter<Byte> tinyints(int id) {
+    return new ByteToIntegerWriter(id);
   }
 
-  public static ValueWriter<Short> shorts() {
-    return ShortToIntegerWriter.INSTANCE;
+  public static ValueWriter<Short> shorts(int id) {
+    return new ShortToIntegerWriter(id);
   }
 
-  public static ValueWriter<Integer> ints() {
-    return IntegerWriter.INSTANCE;
+  public static ValueWriter<Integer> ints(int id) {
+    return new IntegerWriter(id);
   }
 
-  public static ValueWriter<Long> longs() {
-    return LongWriter.INSTANCE;
+  public static ValueWriter<Long> longs(int id) {
+    return new LongWriter(id);
   }
 
-  public static ValueWriter<Float> floats() {
-    return FloatWriter.INSTANCE;
+  public static ValueWriter<Float> floats(int id) {
+    return new FloatWriter(id);
   }
 
-  public static ValueWriter<Double> doubles() {
-    return DoubleWriter.INSTANCE;
+  public static ValueWriter<Double> doubles(int id) {
+    return new DoubleWriter(id);
   }
 
-  public static ValueWriter<Object> strings() {
-    return StringWriter.INSTANCE;
+  public static ValueWriter<CharSequence> strings(int id) {
+    return new StringWriter(id);
   }
 
-  public static ValueWriter<Utf8> utf8s() {
-    return Utf8Writer.INSTANCE;
+  public static ValueWriter<Utf8> utf8s(int id) {
+    return new Utf8Writer(id);
   }
 
-  public static ValueWriter<UUID> uuids() {
-    return UUIDWriter.INSTANCE;
+  public static ValueWriter<UUID> uuids(int id) {
+    return new UUIDWriter(id);
   }
 
-  public static ValueWriter<byte[]> fixed(int length) {
-    return new FixedWriter(length);
+  public static ValueWriter<byte[]> fixed(int id, int length) {
+    return new FixedWriter(id, length);
   }
 
-  public static ValueWriter<GenericData.Fixed> genericFixed(int length) {
-    return new GenericFixedWriter(length);
+  public static ValueWriter<GenericData.Fixed> genericFixed(int id, int length) {
+    return new GenericFixedWriter(id, length);
   }
 
-  public static ValueWriter<byte[]> bytes() {
-    return BytesWriter.INSTANCE;
+  public static ValueWriter<byte[]> bytes(int id) {
+    return new BytesWriter(id);
   }
 
-  public static ValueWriter<ByteBuffer> byteBuffers() {
-    return ByteBufferWriter.INSTANCE;
+  public static ValueWriter<ByteBuffer> byteBuffers(int id) {
+    return new ByteBufferWriter(id);
   }
 
-  public static ValueWriter<BigDecimal> decimal(int precision, int scale) {
-    return new DecimalWriter(precision, scale);
+  public static ValueWriter<BigDecimal> decimal(int id, int precision, int scale) {
+    return new DecimalWriter(id, precision, scale);
   }
 
-  public static <T> ValueWriter<T> option(int nullIndex, ValueWriter<T> writer) {
-    return new OptionWriter<>(nullIndex, writer);
+  public static <T> ValueWriter<T> option(int nullIndex, ValueWriter<T> writer, Schema.Type type) {
+    if (AvroSchemaUtil.supportsMetrics(type)) {

Review comment:
       Rather than introducing `supportsMetrics`, why not just check the value writer to see whether it is a `MetricsAwareWriter`? I know that not all of the writers extend that class, but you could either introduce a `MetricsWriter` interface to signal that the inner writer supports metrics that all of the implementations extend, or maybe you could alter the hierarchy a little so that `StoredAsIntWriter` and `StoredAsLongWriter` actually do extend `MetricsAwareWriter`. Then you wouldn't need changes to `AvroSchemaUtil` or so many changes to option handling.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
##########
@@ -74,52 +78,60 @@ private FlinkValueWriters() {
     return new RowWriter(writers, types);
   }
 
-  private static class StringWriter implements ValueWriter<StringData> {
-    private static final StringWriter INSTANCE = new StringWriter();
-
-    private StringWriter() {
+  private static class StringWriter extends ValueWriters.MetricsAwareStringWriter<StringData> {
+    private StringWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(StringData s, Encoder encoder) throws IOException {
+    protected void writeVal(StringData s, Encoder encoder) throws IOException {
       // toBytes is cheaper than Avro calling toString, which incurs encoding costs
       encoder.writeString(new Utf8(s.toBytes()));
     }
   }
 
-  private static class DecimalWriter implements ValueWriter<DecimalData> {
+  private static class DecimalWriter extends ValueWriters.ComparableWriter<DecimalData> {
     private final int precision;
     private final int scale;
     private final ThreadLocal<byte[]> bytes;
 
-    private DecimalWriter(int precision, int scale) {
+    private DecimalWriter(int id, int precision, int scale) {
+      super(id);
       this.precision = precision;
       this.scale = scale;
       this.bytes = ThreadLocal.withInitial(() -> new byte[TypeUtil.decimalRequiredBytes(precision)]);
     }
 
     @Override
-    public void write(DecimalData d, Encoder encoder) throws IOException {
+    protected void writeVal(DecimalData d, Encoder encoder) throws IOException {
       encoder.writeFixed(DecimalUtil.toReusedFixLengthBytes(precision, scale, d.toBigDecimal(), bytes.get()));
     }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return metrics(DecimalData::toBigDecimal);

Review comment:
       Does this return a Java `BigDecimal`? I know we've had problems with this method in Spark because it actually produces a Scala `BigDecimal`.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkValueWriters.java
##########
@@ -72,61 +71,41 @@ private SparkValueWriters() {
     return new StructWriter(writers, types);
   }
 
-  private static class StringWriter implements ValueWriter<UTF8String> {
-    private static final StringWriter INSTANCE = new StringWriter();
-
-    private StringWriter() {
+  private static class StringWriter extends ValueWriters.MetricsAwareStringWriter<UTF8String> {
+    private StringWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(UTF8String s, Encoder encoder) throws IOException {
+    protected void writeVal(UTF8String s, Encoder encoder) throws IOException {
       // use getBytes because it may return the backing byte array if available.
       // otherwise, it copies to a new byte array, which is still cheaper than Avro
       // calling toString, which incurs encoding costs
       encoder.writeString(new Utf8(s.getBytes()));
     }
   }
 
-  private static class UUIDWriter implements ValueWriter<UTF8String> {
-    private static final ThreadLocal<ByteBuffer> BUFFER = ThreadLocal.withInitial(() -> {
-      ByteBuffer buffer = ByteBuffer.allocate(16);
-      buffer.order(ByteOrder.BIG_ENDIAN);
-      return buffer;
-    });
-
-    private static final UUIDWriter INSTANCE = new UUIDWriter();
-
-    private UUIDWriter() {
-    }
-
-    @Override
-    @SuppressWarnings("ByteBufferBackingArray")
-    public void write(UTF8String s, Encoder encoder) throws IOException {
-      // TODO: direct conversion from string to byte buffer
-      UUID uuid = UUID.fromString(s.toString());
-      ByteBuffer buffer = BUFFER.get();
-      buffer.rewind();
-      buffer.putLong(uuid.getMostSignificantBits());
-      buffer.putLong(uuid.getLeastSignificantBits());
-      encoder.writeFixed(buffer.array());
-    }
-  }
-
-  private static class DecimalWriter implements ValueWriter<Decimal> {
+  private static class DecimalWriter extends ValueWriters.MetricsAwareWriter<Decimal> {
     private final int precision;
     private final int scale;
     private final ThreadLocal<byte[]> bytes;
 
-    private DecimalWriter(int precision, int scale) {
+    private DecimalWriter(int id, int precision, int scale) {
+      super(id, Comparators.forType(Types.DecimalType.of(precision, scale)));
       this.precision = precision;
       this.scale = scale;
       this.bytes = ThreadLocal.withInitial(() -> new byte[TypeUtil.decimalRequiredBytes(precision)]);
     }
 
     @Override
-    public void write(Decimal d, Encoder encoder) throws IOException {
+    protected void writeVal(Decimal d, Encoder encoder) throws IOException {
       encoder.writeFixed(DecimalUtil.toReusedFixLengthBytes(precision, scale, d.toJavaBigDecimal(), bytes.get()));
     }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return metrics(Decimal::toJavaBigDecimal);

Review comment:
       Good to see you got the right one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r569628011



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -358,8 +380,10 @@ public void write(BigDecimal decimal, Encoder encoder) throws IOException {
     private final int nullIndex;
     private final int valueIndex;
     private final ValueWriter<T> valueWriter;
+    private final Schema.Type type;
+    private long nullValueCount;
 
-    private OptionWriter(int nullIndex, ValueWriter<T> valueWriter) {
+    private OptionWriter(int nullIndex, ValueWriter<T> valueWriter, Schema.Type type) {

Review comment:
       It looks like the only place that `type` is used is in an error message. I'd prefer not to change the signature here just to print a type.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579892523



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +561,153 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends ComparableWriter<T> {

Review comment:
       I don't think this needs to extend `ComparableWriter` since it calls `compareTo` directly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579890346



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -135,145 +150,145 @@ private NullWriter() {
 
     @Override
     public void write(Void ignored, Encoder encoder) throws IOException {
-      encoder.writeNull();
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");
     }
-  }
 
-  private static class BooleanWriter implements ValueWriter<Boolean> {
-    private static final BooleanWriter INSTANCE = new BooleanWriter();
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");

Review comment:
       Let's default this to `Stream.empty()` so that it won't fail.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579896146



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +561,153 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends ComparableWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      if (datum == null) {
+        nullValueCount++;
+      } else if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {

Review comment:
       Is this only used for Spark's UTF8String?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r572542885



##########
File path: core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java
##########
@@ -19,16 +19,14 @@
 
 package org.apache.iceberg;
 
-import java.nio.ByteBuffer;
-
 /**
  * Iceberg internally tracked field level metrics, used by Parquet and ORC writers only.
  * <p>
  * Parquet/ORC keeps track of most metrics in file statistics, and only NaN counter is actually tracked by writers.
  * This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing
  * exceptions when they are accessed.
  */
-public class FloatFieldMetrics extends FieldMetrics {
+public class FloatFieldMetrics extends FieldMetrics<Object> {

Review comment:
       Sorry should have double check on this. Although I guess `Number` might fit better since Double writer will also use this class, and `Double` doesn't extend `Float` (although it really shouldn't cause a difference in execution)?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r549473974



##########
File path: core/src/main/java/org/apache/iceberg/FieldMetrics.java
##########
@@ -30,15 +28,15 @@
   private final long valueCount;
   private final long nullValueCount;
   private final long nanValueCount;
-  private final ByteBuffer lowerBound;
-  private final ByteBuffer upperBound;
+  private final Object lowerBound;
+  private final Object upperBound;
 
   public FieldMetrics(int id,
                       long valueCount,
                       long nullValueCount,
                       long nanValueCount,
-                      ByteBuffer lowerBound,
-                      ByteBuffer upperBound) {
+                      Object lowerBound,
+                      Object upperBound) {

Review comment:
       If this is going to be the value instead of the serialized value, then the class should be parameterized by the value type: `FieldMetrics<T>` with `public T lowerBound()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r572552249



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -358,8 +380,10 @@ public void write(BigDecimal decimal, Encoder encoder) throws IOException {
     private final int nullIndex;
     private final int valueIndex;
     private final ValueWriter<T> valueWriter;
+    private final Schema.Type type;
+    private long nullValueCount;
 
-    private OptionWriter(int nullIndex, ValueWriter<T> valueWriter) {
+    private OptionWriter(int nullIndex, ValueWriter<T> valueWriter, Schema.Type type) {

Review comment:
       Oh I think this `type` is also used for checking `isMetricSupportedType/supportsMetrics` so we will still need it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r582545976



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +561,153 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends ComparableWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      if (datum == null) {
+        nullValueCount++;
+      } else if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {
+    public MetricsAwareStringWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to string to allow upper/lower bound truncation when gathering metrics,
+      // as in different implementations there's no guarantee that input to string writer will be char sequence
+      return metrics(Object::toString);
+    }
+  }
+
+  private abstract static class MetricsAwareByteArrayWriter extends MetricsAwareWriter<byte[]> {
+    MetricsAwareByteArrayWriter(int id) {
+      super(id, Comparators.unsignedByteArrays());
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics.
+      return metrics(ByteBuffer::wrap);
+    }
+  }
+
+  public abstract static class ComparableWriter<T extends Comparable<T>> extends MetricsAwareWriter<T> {
+    public ComparableWriter(int id) {
+      super(id, Comparable::compareTo);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing.
+   *
+   * @param <T> Input type
+   */
+  public abstract static class MetricsAwareWriter<T> extends MetricsAwareTransformWriter<T, T> {
+    public MetricsAwareWriter(int id, Comparator<T> comparator) {
+      super(id, comparator, Function.identity());
+    }
+
+    /**
+     * Helper class to transform the input type when collecting metrics.
+     * The transform function converts the stats information from the specific type that the underlying writer
+     * understands to a more general type that could be transformed to binary following iceberg single-value
+     * serialization spec.
+     *
+     * @param func tranformation function
+     * @return a stream of field metrics with bounds converted by the given transformation
+     */
+    protected Stream<FieldMetrics> metrics(Function<T, ?> func) {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, 0,
+          updateBound(min, func), updateBound(max, func)));
+    }
+
+    private <T3, T4> T4 updateBound(T3 bound, Function<T3, T4> func) {
+      return bound == null ? null : func.apply(bound);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing, and accepts a
+   * transformation in its constructor.
+   * The transformation will apply to the input data to produce the type that the underlying writer accepts.
+   * Stats will also be tracked with the type after transformation.
+   *
+   * @param <T1> Input type
+   * @param <T2> Type after transformation
+   */
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  public abstract static class MetricsAwareTransformWriter<T1, T2> implements ValueWriter<T1> {
+    protected final int id;
+    protected long valueCount;
+    protected long nullValueCount;
+    protected T2 max;
+    protected T2 min;
+    protected final Function<T1, T2> transformation;
+
+    private final Comparator<T2> comparator;
+
+    public MetricsAwareTransformWriter(int id, Comparator<T2> comparator, Function<T1, T2> func) {
+      this.id = id;
+      this.comparator = comparator;
+      this.transformation = func;
+    }
+
+    @Override
+    public void write(T1 datum, Encoder encoder) throws IOException {
+      valueCount++;
+      if (datum == null) {
+        nullValueCount++;
+        writeVal(null, encoder);
+
+      } else {
+        T2 transformedDatum = transformation.apply(datum);
+        if (max == null || comparator.compare(transformedDatum, max) > 0) {
+          max = transformedDatum;
+        }
+        if (min == null || comparator.compare(transformedDatum, min) < 0) {
+          min = transformedDatum;
+        }
+        writeVal(transformedDatum, encoder);

Review comment:
       Yes, this makes the code much cleaner than before, thank you for the great suggestion! 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579887454



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
##########
@@ -95,10 +95,10 @@ public void close() throws IOException {
 
   @SuppressWarnings("unchecked")
   private static <D> DataFileWriter<D> newAvroWriter(
-      Schema schema, PositionOutputStream stream, DatumWriter<?> metricsAwareDatumWriter,
+      Schema schema, PositionOutputStream stream, DatumWriter<?> datumWriter,
       CodecFactory codec, Map<String, String> metadata) throws IOException {
     DataFileWriter<D> writer = new DataFileWriter<>(
-        (DatumWriter<D>) metricsAwareDatumWriter);
+        (DatumWriter<D>) datumWriter);

Review comment:
       Nit: no need for the newline any more.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r569069789



##########
File path: core/src/main/java/org/apache/iceberg/FloatFieldMetrics.java
##########
@@ -19,16 +19,14 @@
 
 package org.apache.iceberg;
 
-import java.nio.ByteBuffer;
-
 /**
  * Iceberg internally tracked field level metrics, used by Parquet and ORC writers only.
  * <p>
  * Parquet/ORC keeps track of most metrics in file statistics, and only NaN counter is actually tracked by writers.
  * This wrapper ensures that metrics not being updated by those writers will not be incorrectly used, by throwing
  * exceptions when they are accessed.
  */
-public class FloatFieldMetrics extends FieldMetrics {
+public class FloatFieldMetrics extends FieldMetrics<Object> {

Review comment:
       Why isn't this `FieldMetrics<Float>`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r569629070



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -369,17 +393,42 @@ private OptionWriter(int nullIndex, ValueWriter<T> valueWriter) {
         throw new IllegalArgumentException("Invalid option index: " + nullIndex);
       }
       this.valueWriter = valueWriter;
+      this.type = type;
+      this.nullValueCount = 0;
     }
 
     @Override
     public void write(T option, Encoder encoder) throws IOException {
       if (option == null) {
         encoder.writeIndex(nullIndex);
+        nullValueCount++;
       } else {
         encoder.writeIndex(valueIndex);
         valueWriter.write(option, encoder);
       }
     }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      if (AvroSchemaUtil.isMetricSupportedType(type)) {
+        return mergeNullCountIntoMetric();
+      } else {
+        return valueWriter.metrics();
+      }
+    }
+
+    private Stream<FieldMetrics> mergeNullCountIntoMetric() {
+      List<FieldMetrics> fieldMetricsFromWriter = valueWriter.metrics().collect(Collectors.toList());
+      Preconditions.checkState(fieldMetricsFromWriter.size() == 1,
+          "Optional field for type % shouldn't vend more than one field metrics", type);

Review comment:
       Okay, thinking about this a bit more, I don't think this assumption holds.
   
   There are lots of times when a structure in the middle of a record might be optional. An optional map, for example, would have both key and value metrics. I think that we should only merge null in if we know that this is wrapping a leaf writer, not a map, list, or struct writer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r569625353



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -369,17 +393,42 @@ private OptionWriter(int nullIndex, ValueWriter<T> valueWriter) {
         throw new IllegalArgumentException("Invalid option index: " + nullIndex);
       }
       this.valueWriter = valueWriter;
+      this.type = type;
+      this.nullValueCount = 0;
     }
 
     @Override
     public void write(T option, Encoder encoder) throws IOException {
       if (option == null) {
         encoder.writeIndex(nullIndex);
+        nullValueCount++;
       } else {
         encoder.writeIndex(valueIndex);
         valueWriter.write(option, encoder);
       }
     }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      if (AvroSchemaUtil.isMetricSupportedType(type)) {
+        return mergeNullCountIntoMetric();
+      } else {
+        return valueWriter.metrics();
+      }
+    }
+
+    private Stream<FieldMetrics> mergeNullCountIntoMetric() {
+      List<FieldMetrics> fieldMetricsFromWriter = valueWriter.metrics().collect(Collectors.toList());
+      Preconditions.checkState(fieldMetricsFromWriter.size() == 1,
+          "Optional field for type % shouldn't vend more than one field metrics", type);

Review comment:
       Also, the use of "vend" is unusual. What about "Optional %s writer should not produce metrics for more than one field"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r569612916



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
##########
@@ -408,4 +431,11 @@ private static String sanitize(char character) {
     }
     return "_x" + Integer.toHexString(character).toUpperCase();
   }
+
+  static boolean isMetricSupportedType(Schema.Type type) {

Review comment:
       How about `supportsMetrics`? I think that would be a bit easier to reason about and it flows in an `if`: `if (supportsMetrics(...)) {...}`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579890313



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -135,145 +150,145 @@ private NullWriter() {
 
     @Override
     public void write(Void ignored, Encoder encoder) throws IOException {
-      encoder.writeNull();
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");

Review comment:
       I think we should revert this back so that `writeNull` is correctly called. Nulls shouldn't be written as anything, but the fact that the encoder has a `writeNull` method makes me think that it should be called. We don't know what the encoder might be using it for.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r582545802



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +561,153 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends ComparableWriter<T> {

Review comment:
       I think this was to mainly save an argument for constructor, but I can pass `null` and add a comment. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#issuecomment-772152597


   @yyanyy, can you rebase this? I think merging #1946 caused conflicts.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579898395



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -369,17 +393,42 @@ private OptionWriter(int nullIndex, ValueWriter<T> valueWriter) {
         throw new IllegalArgumentException("Invalid option index: " + nullIndex);
       }
       this.valueWriter = valueWriter;
+      this.type = type;
+      this.nullValueCount = 0;
     }
 
     @Override
     public void write(T option, Encoder encoder) throws IOException {
       if (option == null) {
         encoder.writeIndex(nullIndex);
+        nullValueCount++;
       } else {
         encoder.writeIndex(valueIndex);
         valueWriter.write(option, encoder);
       }
     }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      if (AvroSchemaUtil.isMetricSupportedType(type)) {
+        return mergeNullCountIntoMetric();
+      } else {
+        return valueWriter.metrics();
+      }
+    }
+
+    private Stream<FieldMetrics> mergeNullCountIntoMetric() {
+      List<FieldMetrics> fieldMetricsFromWriter = valueWriter.metrics().collect(Collectors.toList());
+      Preconditions.checkState(fieldMetricsFromWriter.size() == 1,
+          "Optional field for type % shouldn't vend more than one field metrics", type);

Review comment:
       Yes, you're right that it is handled there. I missed that originally. I commented on how to avoid passing the type in, but I agree that you do handle it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579890994



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -135,145 +150,145 @@ private NullWriter() {
 
     @Override
     public void write(Void ignored, Encoder encoder) throws IOException {
-      encoder.writeNull();
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");
     }
-  }
 
-  private static class BooleanWriter implements ValueWriter<Boolean> {
-    private static final BooleanWriter INSTANCE = new BooleanWriter();
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");
+    }
+  }
 
-    private BooleanWriter() {
+  private static class BooleanWriter extends ComparableWriter<Boolean> {
+    private BooleanWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Boolean bool, Encoder encoder) throws IOException {
+    protected void writeVal(Boolean bool, Encoder encoder) throws IOException {
       encoder.writeBoolean(bool);
     }
   }
 
-  private static class ByteToIntegerWriter implements ValueWriter<Byte> {
-    private static final ByteToIntegerWriter INSTANCE = new ByteToIntegerWriter();
-
-    private ByteToIntegerWriter() {
+  private static class ByteToIntegerWriter extends MetricsAwareTransformWriter<Byte, Integer> {
+    private ByteToIntegerWriter(int id) {
+      super(id, Integer::compareTo, Byte::intValue);

Review comment:
       This change is will lead to unnecessary boxing of the converted value because it needs to pass an Object to `writeVal`. I'd rather duplicate a little bit more code than have the object creation in a tight loop. Can you update the byte and short writers to avoid this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#issuecomment-892920161


   Thanks, @yyanyy! No rush, I just wanted to check in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r572543822



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -135,145 +150,145 @@ private NullWriter() {
 
     @Override
     public void write(Void ignored, Encoder encoder) throws IOException {
-      encoder.writeNull();
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");

Review comment:
       I think it won't, this was just me trying to make this fail loudly instead of fail silently and to avoid usage confusion in code, I can revert it back if you think it's not necessary. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579891436



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -135,145 +150,145 @@ private NullWriter() {
 
     @Override
     public void write(Void ignored, Encoder encoder) throws IOException {
-      encoder.writeNull();
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");
     }
-  }
 
-  private static class BooleanWriter implements ValueWriter<Boolean> {
-    private static final BooleanWriter INSTANCE = new BooleanWriter();
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");
+    }
+  }
 
-    private BooleanWriter() {
+  private static class BooleanWriter extends ComparableWriter<Boolean> {
+    private BooleanWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Boolean bool, Encoder encoder) throws IOException {
+    protected void writeVal(Boolean bool, Encoder encoder) throws IOException {
       encoder.writeBoolean(bool);
     }
   }
 
-  private static class ByteToIntegerWriter implements ValueWriter<Byte> {
-    private static final ByteToIntegerWriter INSTANCE = new ByteToIntegerWriter();
-
-    private ByteToIntegerWriter() {
+  private static class ByteToIntegerWriter extends MetricsAwareTransformWriter<Byte, Integer> {
+    private ByteToIntegerWriter(int id) {
+      super(id, Integer::compareTo, Byte::intValue);
     }
 
     @Override
-    public void write(Byte b, Encoder encoder) throws IOException {
-      encoder.writeInt(b.intValue());
+    protected void writeVal(Integer intVal, Encoder encoder) throws IOException {
+      encoder.writeInt(intVal);
     }
   }
 
-  private static class ShortToIntegerWriter implements ValueWriter<Short> {
-    private static final ShortToIntegerWriter INSTANCE = new ShortToIntegerWriter();
-
-    private ShortToIntegerWriter() {
+  private static class ShortToIntegerWriter extends MetricsAwareTransformWriter<Short, Integer> {
+    private ShortToIntegerWriter(int id) {
+      super(id, Integer::compareTo, Short::intValue);
     }
 
     @Override
-    public void write(Short s, Encoder encoder) throws IOException {
-      encoder.writeInt(s.intValue());
+    protected void writeVal(Integer intValue, Encoder encoder) throws IOException {
+      encoder.writeInt(intValue);
     }
   }
 
-  private static class IntegerWriter implements ValueWriter<Integer> {
-    private static final IntegerWriter INSTANCE = new IntegerWriter();
-
-    private IntegerWriter() {
+  private static class IntegerWriter extends ComparableWriter<Integer> {
+    private IntegerWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Integer i, Encoder encoder) throws IOException {
+    protected void writeVal(Integer i, Encoder encoder) throws IOException {
       encoder.writeInt(i);
     }
   }
 
-  private static class LongWriter implements ValueWriter<Long> {
-    private static final LongWriter INSTANCE = new LongWriter();
-
-    private LongWriter() {
+  private static class LongWriter extends ComparableWriter<Long> {
+    private LongWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Long l, Encoder encoder) throws IOException {
+    protected void writeVal(Long l, Encoder encoder) throws IOException {
       encoder.writeLong(l);
     }
   }
 
-  private static class FloatWriter implements ValueWriter<Float> {
-    private static final FloatWriter INSTANCE = new FloatWriter();
-
-    private FloatWriter() {
+  private static class FloatWriter extends FloatingPointWriter<Float> {
+    private FloatWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Float f, Encoder encoder) throws IOException {
+    protected void writeVal(Float f, Encoder encoder) throws IOException {
       encoder.writeFloat(f);
     }
   }
 
-  private static class DoubleWriter implements ValueWriter<Double> {
-    private static final DoubleWriter INSTANCE = new DoubleWriter();
-
-    private DoubleWriter() {
+  private static class DoubleWriter extends FloatingPointWriter<Double> {
+    private DoubleWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Double d, Encoder encoder) throws IOException {
+    protected void writeVal(Double d, Encoder encoder) throws IOException {
       encoder.writeDouble(d);
     }
   }
 
-  private static class StringWriter implements ValueWriter<Object> {
-    private static final StringWriter INSTANCE = new StringWriter();
-
-    private StringWriter() {
+  private static class StringWriter extends MetricsAwareWriter<CharSequence> {
+    private StringWriter(int id) {
+      super(id, Comparators.charSequences());
     }
 
     @Override
-    public void write(Object s, Encoder encoder) throws IOException {
+    public void write(CharSequence s, Encoder encoder) throws IOException {
       // use getBytes because it may return the backing byte array if available.
       // otherwise, it copies to a new byte array, which is still cheaper than Avro
       // calling toString, which incurs encoding costs
       if (s instanceof Utf8) {
-        encoder.writeString((Utf8) s);
+        super.write(s, encoder);
       } else if (s instanceof String) {
-        encoder.writeString(new Utf8((String) s));
+        super.write(new Utf8((String) s), encoder);
       } else if (s == null) {
         throw new IllegalArgumentException("Cannot write null to required string column");
       } else {
         throw new IllegalArgumentException(
             "Cannot write unknown string type: " + s.getClass().getName() + ": " + s.toString());
       }
     }
-  }
 
-  private static class Utf8Writer implements ValueWriter<Utf8> {
-    private static final Utf8Writer INSTANCE = new Utf8Writer();
+    @Override
+    protected void writeVal(CharSequence s, Encoder encoder) throws IOException {

Review comment:
       Is this called? Seems like it wouldn't be since `write` is overridden.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579892185



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +561,153 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends ComparableWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      if (datum == null) {
+        nullValueCount++;
+      } else if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {
+    public MetricsAwareStringWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to string to allow upper/lower bound truncation when gathering metrics,
+      // as in different implementations there's no guarantee that input to string writer will be char sequence
+      return metrics(Object::toString);
+    }
+  }
+
+  private abstract static class MetricsAwareByteArrayWriter extends MetricsAwareWriter<byte[]> {
+    MetricsAwareByteArrayWriter(int id) {
+      super(id, Comparators.unsignedByteArrays());
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics.
+      return metrics(ByteBuffer::wrap);
+    }
+  }
+
+  public abstract static class ComparableWriter<T extends Comparable<T>> extends MetricsAwareWriter<T> {
+    public ComparableWriter(int id) {
+      super(id, Comparable::compareTo);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing.
+   *
+   * @param <T> Input type
+   */
+  public abstract static class MetricsAwareWriter<T> extends MetricsAwareTransformWriter<T, T> {
+    public MetricsAwareWriter(int id, Comparator<T> comparator) {
+      super(id, comparator, Function.identity());
+    }
+
+    /**
+     * Helper class to transform the input type when collecting metrics.
+     * The transform function converts the stats information from the specific type that the underlying writer
+     * understands to a more general type that could be transformed to binary following iceberg single-value
+     * serialization spec.
+     *
+     * @param func tranformation function
+     * @return a stream of field metrics with bounds converted by the given transformation
+     */
+    protected Stream<FieldMetrics> metrics(Function<T, ?> func) {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, 0,
+          updateBound(min, func), updateBound(max, func)));
+    }
+
+    private <T3, T4> T4 updateBound(T3 bound, Function<T3, T4> func) {
+      return bound == null ? null : func.apply(bound);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing, and accepts a
+   * transformation in its constructor.
+   * The transformation will apply to the input data to produce the type that the underlying writer accepts.
+   * Stats will also be tracked with the type after transformation.
+   *
+   * @param <T1> Input type
+   * @param <T2> Type after transformation
+   */
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  public abstract static class MetricsAwareTransformWriter<T1, T2> implements ValueWriter<T1> {
+    protected final int id;
+    protected long valueCount;
+    protected long nullValueCount;
+    protected T2 max;
+    protected T2 min;
+    protected final Function<T1, T2> transformation;
+
+    private final Comparator<T2> comparator;
+
+    public MetricsAwareTransformWriter(int id, Comparator<T2> comparator, Function<T1, T2> func) {
+      this.id = id;
+      this.comparator = comparator;
+      this.transformation = func;
+    }
+
+    @Override
+    public void write(T1 datum, Encoder encoder) throws IOException {
+      valueCount++;
+      if (datum == null) {

Review comment:
       Is this necessary? Before, all of the type-specific writers assumed that the input value was non-null because null isn't allowed unless the type is optional. If it is optional, the writer will be wrapped in an option writer, so there is no need to handle null.
   
   I think this should similarly assume that the option writer tracks null values and that all values will be non-null. That simplifies this class quite a bit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#issuecomment-892311444


   > Any update on this one, @yyanyy?
   
   Apologies, didn't find a chance to update this. I'll make sure to allocate time to address the comments in the coming two weeks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#issuecomment-890589132


   Any update on this one, @yyanyy?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r552241871



##########
File path: core/src/main/java/org/apache/iceberg/FieldMetrics.java
##########
@@ -30,15 +28,15 @@
   private final long valueCount;
   private final long nullValueCount;
   private final long nanValueCount;
-  private final ByteBuffer lowerBound;
-  private final ByteBuffer upperBound;
+  private final Object lowerBound;
+  private final Object upperBound;
 
   public FieldMetrics(int id,
                       long valueCount,
                       long nullValueCount,
                       long nanValueCount,
-                      ByteBuffer lowerBound,
-                      ByteBuffer upperBound) {
+                      Object lowerBound,
+                      Object upperBound) {

Review comment:
       Yes, sorry I forgot to address that comment after attempting the alternative approach (mentioned in description). I have updated this PR to address this. And sorry for the delay! 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r572543975



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -369,17 +393,42 @@ private OptionWriter(int nullIndex, ValueWriter<T> valueWriter) {
         throw new IllegalArgumentException("Invalid option index: " + nullIndex);
       }
       this.valueWriter = valueWriter;
+      this.type = type;
+      this.nullValueCount = 0;
     }
 
     @Override
     public void write(T option, Encoder encoder) throws IOException {
       if (option == null) {
         encoder.writeIndex(nullIndex);
+        nullValueCount++;
       } else {
         encoder.writeIndex(valueIndex);
         valueWriter.write(option, encoder);
       }
     }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      if (AvroSchemaUtil.isMetricSupportedType(type)) {
+        return mergeNullCountIntoMetric();
+      } else {
+        return valueWriter.metrics();
+      }
+    }
+
+    private Stream<FieldMetrics> mergeNullCountIntoMetric() {
+      List<FieldMetrics> fieldMetricsFromWriter = valueWriter.metrics().collect(Collectors.toList());
+      Preconditions.checkState(fieldMetricsFromWriter.size() == 1,
+          "Optional field for type % shouldn't vend more than one field metrics", type);

Review comment:
       I think this is currently handled by its caller, that only calls this if `AvroSchemaUtil. isMetricSupportedType` is true, and `AvroSchemaUtil. isMetricSupportedType` only returns true for primitive types? 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r569621792



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -135,145 +150,145 @@ private NullWriter() {
 
     @Override
     public void write(Void ignored, Encoder encoder) throws IOException {
-      encoder.writeNull();
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");

Review comment:
       Is this change necessary? Do you think that it would actually cause a problem if it were used?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] yyanyy commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
yyanyy commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r572543387



##########
File path: core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java
##########
@@ -161,4 +186,17 @@ public T map(P sMap, Schema map, T value) {
   public T primitive(P type, Schema primitive) {
     return null;
   }
+
+  // ---------------------------------- Helpers ---------------------------------------------
+
+  private Deque<String> fieldNames = Lists.newLinkedList();
+  private Deque<Schema> parentSchemas = Lists.newLinkedList();

Review comment:
       I think I did think of that pattern, and from my notes the main reason I didn't do it is that it won't be as clean as the existing pattern of `before/afterField` in other visitors, as different data structure has different way of retrieving field id information. For example, for fields that are part of the struct, the field id is stored in its own `Schema.Field` so that we can directly pass `field` to `before/afterField` within the for loop when looping through fields; but for map value, the id is stored in the map's schema instead of its own schema, so that `beforeMapValue` should be passed with the parent schema. My thought was that to require different visitors to implement `before/after` with all these different parameters, and to duplicate the various ID retrieval logic among data types in different visitor implementations could be messy to reason about, that keeping them here might actually be cleaner. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579893972



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -135,145 +150,145 @@ private NullWriter() {
 
     @Override
     public void write(Void ignored, Encoder encoder) throws IOException {
-      encoder.writeNull();
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");
     }
-  }
 
-  private static class BooleanWriter implements ValueWriter<Boolean> {
-    private static final BooleanWriter INSTANCE = new BooleanWriter();
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro");
+    }
+  }
 
-    private BooleanWriter() {
+  private static class BooleanWriter extends ComparableWriter<Boolean> {
+    private BooleanWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Boolean bool, Encoder encoder) throws IOException {
+    protected void writeVal(Boolean bool, Encoder encoder) throws IOException {
       encoder.writeBoolean(bool);
     }
   }
 
-  private static class ByteToIntegerWriter implements ValueWriter<Byte> {
-    private static final ByteToIntegerWriter INSTANCE = new ByteToIntegerWriter();
-
-    private ByteToIntegerWriter() {
+  private static class ByteToIntegerWriter extends MetricsAwareTransformWriter<Byte, Integer> {
+    private ByteToIntegerWriter(int id) {
+      super(id, Integer::compareTo, Byte::intValue);

Review comment:
       Actually, I think it's fine this way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r579892240



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +561,153 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends ComparableWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      if (datum == null) {
+        nullValueCount++;
+      } else if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {
+    public MetricsAwareStringWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to string to allow upper/lower bound truncation when gathering metrics,
+      // as in different implementations there's no guarantee that input to string writer will be char sequence
+      return metrics(Object::toString);
+    }
+  }
+
+  private abstract static class MetricsAwareByteArrayWriter extends MetricsAwareWriter<byte[]> {
+    MetricsAwareByteArrayWriter(int id) {
+      super(id, Comparators.unsignedByteArrays());
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics.
+      return metrics(ByteBuffer::wrap);
+    }
+  }
+
+  public abstract static class ComparableWriter<T extends Comparable<T>> extends MetricsAwareWriter<T> {
+    public ComparableWriter(int id) {
+      super(id, Comparable::compareTo);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing.
+   *
+   * @param <T> Input type
+   */
+  public abstract static class MetricsAwareWriter<T> extends MetricsAwareTransformWriter<T, T> {
+    public MetricsAwareWriter(int id, Comparator<T> comparator) {
+      super(id, comparator, Function.identity());
+    }
+
+    /**
+     * Helper class to transform the input type when collecting metrics.
+     * The transform function converts the stats information from the specific type that the underlying writer
+     * understands to a more general type that could be transformed to binary following iceberg single-value
+     * serialization spec.
+     *
+     * @param func tranformation function
+     * @return a stream of field metrics with bounds converted by the given transformation
+     */
+    protected Stream<FieldMetrics> metrics(Function<T, ?> func) {
+      return Stream.of(new FieldMetrics<>(id, valueCount, nullValueCount, 0,
+          updateBound(min, func), updateBound(max, func)));
+    }
+
+    private <T3, T4> T4 updateBound(T3 bound, Function<T3, T4> func) {
+      return bound == null ? null : func.apply(bound);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing, and accepts a
+   * transformation in its constructor.
+   * The transformation will apply to the input data to produce the type that the underlying writer accepts.
+   * Stats will also be tracked with the type after transformation.
+   *
+   * @param <T1> Input type
+   * @param <T2> Type after transformation
+   */
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  public abstract static class MetricsAwareTransformWriter<T1, T2> implements ValueWriter<T1> {
+    protected final int id;
+    protected long valueCount;
+    protected long nullValueCount;
+    protected T2 max;
+    protected T2 min;
+    protected final Function<T1, T2> transformation;
+
+    private final Comparator<T2> comparator;
+
+    public MetricsAwareTransformWriter(int id, Comparator<T2> comparator, Function<T1, T2> func) {
+      this.id = id;
+      this.comparator = comparator;
+      this.transformation = func;
+    }
+
+    @Override
+    public void write(T1 datum, Encoder encoder) throws IOException {
+      valueCount++;
+      if (datum == null) {
+        nullValueCount++;
+        writeVal(null, encoder);
+
+      } else {
+        T2 transformedDatum = transformation.apply(datum);
+        if (max == null || comparator.compare(transformedDatum, max) > 0) {
+          max = transformedDatum;
+        }
+        if (min == null || comparator.compare(transformedDatum, min) < 0) {
+          min = transformedDatum;
+        }
+        writeVal(transformedDatum, encoder);
+

Review comment:
       Nit: no newline after `if` blocks and unnecessary newline before closing curly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org