You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:01:33 UTC

[GitHub] [iceberg] rdblue commented on a change in pull request #1963: Avro metrics support: track metrics in Avro value writers

rdblue commented on a change in pull request #1963:
URL: https://github.com/apache/iceberg/pull/1963#discussion_r653862129



##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -137,143 +156,143 @@ private NullWriter() {
     public void write(Void ignored, Encoder encoder) throws IOException {
       encoder.writeNull();
     }
-  }
 
-  private static class BooleanWriter implements ValueWriter<Boolean> {
-    private static final BooleanWriter INSTANCE = new BooleanWriter();
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.empty();
+    }
+  }
 
-    private BooleanWriter() {
+  private static class BooleanWriter extends ComparableWriter<Boolean> {
+    private BooleanWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Boolean bool, Encoder encoder) throws IOException {
+    protected void writeVal(Boolean bool, Encoder encoder) throws IOException {
       encoder.writeBoolean(bool);
     }
   }
 
-  private static class ByteToIntegerWriter implements ValueWriter<Byte> {
-    private static final ByteToIntegerWriter INSTANCE = new ByteToIntegerWriter();
-
-    private ByteToIntegerWriter() {
+  private static class ByteToIntegerWriter extends StoredAsIntWriter<Byte> {
+    private ByteToIntegerWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Byte b, Encoder encoder) throws IOException {
-      encoder.writeInt(b.intValue());
+    protected int convert(Byte from) {
+      return from.intValue();
     }
   }
 
-  private static class ShortToIntegerWriter implements ValueWriter<Short> {
-    private static final ShortToIntegerWriter INSTANCE = new ShortToIntegerWriter();
-
-    private ShortToIntegerWriter() {
+  private static class ShortToIntegerWriter extends StoredAsIntWriter<Short> {
+    private ShortToIntegerWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Short s, Encoder encoder) throws IOException {
-      encoder.writeInt(s.intValue());
+    protected int convert(Short from) {
+      return from.intValue();
     }
   }
 
-  private static class IntegerWriter implements ValueWriter<Integer> {
-    private static final IntegerWriter INSTANCE = new IntegerWriter();
-
-    private IntegerWriter() {
+  private static class IntegerWriter extends ComparableWriter<Integer> {
+    private IntegerWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(Integer i, Encoder encoder) throws IOException {
+    protected void writeVal(Integer i, Encoder encoder) throws IOException {
       encoder.writeInt(i);
     }
   }
 
-  private static class LongWriter implements ValueWriter<Long> {
-    private static final LongWriter INSTANCE = new LongWriter();
-
-    private LongWriter() {
+  private static class LongWriter extends ComparableWriter<Long> {

Review comment:
       I would expect `LongWriter` to extend `StoredAsLongWriter` instead of `ComparableWriter`. Same for `IntegerWriter`. Could you update those?

##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriter.java
##########
@@ -27,7 +27,5 @@
 public interface ValueWriter<D> {
   void write(D datum, Encoder encoder) throws IOException;
 
-  default Stream<FieldMetrics> metrics() {
-    return Stream.empty(); // TODO will populate in following PRs
-  }
+  Stream<FieldMetrics> metrics();

Review comment:
       `FieldMetrics` is parameterized, but this is a bare reference. Could you update it? I think it should be `FieldMetrics<?>` since the metrics are not necessarily for the written value type, `D`.

##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +576,215 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends MetricsAwareWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      // pass null to comparator since we override write() method that uses comparator in this class.
+      super(id, null);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      // null value should be handled by option writer, thus assume datum will not be null here.
+      if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, 0, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {
+    public MetricsAwareStringWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to string to allow upper/lower bound truncation when gathering metrics,
+      // as in different implementations there's no guarantee that input to string writer will be char sequence
+      return metrics(Object::toString);
+    }
+  }
+
+  private abstract static class MetricsAwareByteArrayWriter extends MetricsAwareWriter<byte[]> {
+    MetricsAwareByteArrayWriter(int id) {
+      super(id, Comparators.unsignedByteArrays());
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics.
+      return metrics(ByteBuffer::wrap);
+    }
+  }
+
+  public abstract static class ComparableWriter<T extends Comparable<T>> extends MetricsAwareWriter<T> {
+    public ComparableWriter(int id) {
+      super(id, Comparable::compareTo);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing.
+   *
+   * @param <T> Input type
+   */
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  public abstract static class MetricsAwareWriter<T> implements ValueWriter<T> {
+    protected final int id;
+    protected long valueCount;
+    protected T max;
+    protected T min;
+
+    private final Comparator<T> comparator;
+
+    public MetricsAwareWriter(int id, Comparator<T> comparator) {
+      this.id = id;
+      this.comparator = comparator;
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      // null value should be handled by option writer, thus assume datum will not be null here.
+      if (max == null || comparator.compare(datum, max) > 0) {
+        max = datum;
+      }
+
+      if (min == null || comparator.compare(datum, min) < 0) {
+        min = datum;
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    protected abstract void writeVal(T datum, Encoder encoder) throws IOException;

Review comment:
       I would probably name this `encode` rather than `writeVal` so that there is less confusion with the `write` method.

##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -492,4 +576,215 @@ protected Object get(IndexedRecord struct, int pos) {
       return struct.get(pos);
     }
   }
+
+  private abstract static class FloatingPointWriter<T extends Comparable<T>>
+      extends MetricsAwareWriter<T> {
+    private long nanValueCount;
+
+    FloatingPointWriter(int id) {
+      // pass null to comparator since we override write() method that uses comparator in this class.
+      super(id, null);
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      // null value should be handled by option writer, thus assume datum will not be null here.
+      if (NaNUtil.isNaN(datum)) {
+        nanValueCount++;
+      } else {
+        if (max == null || datum.compareTo(max) > 0) {
+          this.max = datum;
+        }
+
+        if (min == null || datum.compareTo(min) < 0) {
+          this.min = datum;
+        }
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return Stream.of(new FieldMetrics<>(id, valueCount, 0, nanValueCount, min, max));
+    }
+  }
+
+  public abstract static class MetricsAwareStringWriter<T extends Comparable<T>> extends ComparableWriter<T> {
+    public MetricsAwareStringWriter(int id) {
+      super(id);
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to string to allow upper/lower bound truncation when gathering metrics,
+      // as in different implementations there's no guarantee that input to string writer will be char sequence
+      return metrics(Object::toString);
+    }
+  }
+
+  private abstract static class MetricsAwareByteArrayWriter extends MetricsAwareWriter<byte[]> {
+    MetricsAwareByteArrayWriter(int id) {
+      super(id, Comparators.unsignedByteArrays());
+    }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics.
+      return metrics(ByteBuffer::wrap);
+    }
+  }
+
+  public abstract static class ComparableWriter<T extends Comparable<T>> extends MetricsAwareWriter<T> {
+    public ComparableWriter(int id) {
+      super(id, Comparable::compareTo);
+    }
+  }
+
+  /**
+   * A value writer wrapper that keeps track of column statistics (metrics) during writing.
+   *
+   * @param <T> Input type
+   */
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  public abstract static class MetricsAwareWriter<T> implements ValueWriter<T> {
+    protected final int id;
+    protected long valueCount;
+    protected T max;
+    protected T min;
+
+    private final Comparator<T> comparator;
+
+    public MetricsAwareWriter(int id, Comparator<T> comparator) {
+      this.id = id;
+      this.comparator = comparator;
+    }
+
+    @Override
+    public void write(T datum, Encoder encoder) throws IOException {
+      valueCount++;
+
+      // null value should be handled by option writer, thus assume datum will not be null here.
+      if (max == null || comparator.compare(datum, max) > 0) {
+        max = datum;
+      }
+
+      if (min == null || comparator.compare(datum, min) < 0) {
+        min = datum;
+      }
+
+      writeVal(datum, encoder);
+    }
+
+    protected abstract void writeVal(T datum, Encoder encoder) throws IOException;
+
+    @Override
+    public Stream<FieldMetrics> metrics() {

Review comment:
       I'd like to fix all of the references that don't parameterize `FieldMetrics`. I think they should be `FieldMetrics<?>`.

##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -274,7 +282,11 @@ public void testMetricsForNestedStructFields() throws IOException {
     assertBounds(6, BinaryType.get(),
         ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics);
     assertCounts(7, 1L, 0L, 1L, metrics);
-    assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics);
+    if (fileFormat() == FileFormat.AVRO) {

Review comment:
       Is this needed if #2464 goes in first?

##########
File path: core/src/test/java/org/apache/iceberg/TestMetrics.java
##########
@@ -643,4 +747,10 @@ protected void assertCounts(int fieldId, Long valueCount, Long nullValueCount, L
         upperBounds.containsKey(fieldId) ? fromByteBuffer(type, upperBounds.get(fieldId)) : null);
   }
 
+  private void assertNonNullColumnSizes(Metrics metrics) {
+    if (fileFormat() != FileFormat.AVRO) {
+      Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull));
+    }

Review comment:
       Can you add an assertion for Avro? I think that the column sizes map should be null, is that correct?

##########
File path: data/src/test/java/org/apache/iceberg/avro/TestGenericAvroMetrics.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class TestGenericAvroMetrics extends TestAvroMetrics {
+
+  protected Metrics getMetrics(Schema schema, OutputFile file, Map<String, String> properties,
+                                        MetricsConfig metricsConfig, Record... records) throws IOException {

Review comment:
       Nit: indentation is off.

##########
File path: core/src/main/java/org/apache/iceberg/avro/ValueWriters.java
##########
@@ -45,68 +55,72 @@ private ValueWriters() {
     return NullWriter.INSTANCE;
   }
 
-  public static ValueWriter<Boolean> booleans() {
-    return BooleanWriter.INSTANCE;
+  public static ValueWriter<Boolean> booleans(int id) {
+    return new BooleanWriter(id);
   }
 
-  public static ValueWriter<Byte> tinyints() {
-    return ByteToIntegerWriter.INSTANCE;
+  public static ValueWriter<Byte> tinyints(int id) {
+    return new ByteToIntegerWriter(id);
   }
 
-  public static ValueWriter<Short> shorts() {
-    return ShortToIntegerWriter.INSTANCE;
+  public static ValueWriter<Short> shorts(int id) {
+    return new ShortToIntegerWriter(id);
   }
 
-  public static ValueWriter<Integer> ints() {
-    return IntegerWriter.INSTANCE;
+  public static ValueWriter<Integer> ints(int id) {
+    return new IntegerWriter(id);
   }
 
-  public static ValueWriter<Long> longs() {
-    return LongWriter.INSTANCE;
+  public static ValueWriter<Long> longs(int id) {
+    return new LongWriter(id);
   }
 
-  public static ValueWriter<Float> floats() {
-    return FloatWriter.INSTANCE;
+  public static ValueWriter<Float> floats(int id) {
+    return new FloatWriter(id);
   }
 
-  public static ValueWriter<Double> doubles() {
-    return DoubleWriter.INSTANCE;
+  public static ValueWriter<Double> doubles(int id) {
+    return new DoubleWriter(id);
   }
 
-  public static ValueWriter<Object> strings() {
-    return StringWriter.INSTANCE;
+  public static ValueWriter<CharSequence> strings(int id) {
+    return new StringWriter(id);
   }
 
-  public static ValueWriter<Utf8> utf8s() {
-    return Utf8Writer.INSTANCE;
+  public static ValueWriter<Utf8> utf8s(int id) {
+    return new Utf8Writer(id);
   }
 
-  public static ValueWriter<UUID> uuids() {
-    return UUIDWriter.INSTANCE;
+  public static ValueWriter<UUID> uuids(int id) {
+    return new UUIDWriter(id);
   }
 
-  public static ValueWriter<byte[]> fixed(int length) {
-    return new FixedWriter(length);
+  public static ValueWriter<byte[]> fixed(int id, int length) {
+    return new FixedWriter(id, length);
   }
 
-  public static ValueWriter<GenericData.Fixed> genericFixed(int length) {
-    return new GenericFixedWriter(length);
+  public static ValueWriter<GenericData.Fixed> genericFixed(int id, int length) {
+    return new GenericFixedWriter(id, length);
   }
 
-  public static ValueWriter<byte[]> bytes() {
-    return BytesWriter.INSTANCE;
+  public static ValueWriter<byte[]> bytes(int id) {
+    return new BytesWriter(id);
   }
 
-  public static ValueWriter<ByteBuffer> byteBuffers() {
-    return ByteBufferWriter.INSTANCE;
+  public static ValueWriter<ByteBuffer> byteBuffers(int id) {
+    return new ByteBufferWriter(id);
   }
 
-  public static ValueWriter<BigDecimal> decimal(int precision, int scale) {
-    return new DecimalWriter(precision, scale);
+  public static ValueWriter<BigDecimal> decimal(int id, int precision, int scale) {
+    return new DecimalWriter(id, precision, scale);
   }
 
-  public static <T> ValueWriter<T> option(int nullIndex, ValueWriter<T> writer) {
-    return new OptionWriter<>(nullIndex, writer);
+  public static <T> ValueWriter<T> option(int nullIndex, ValueWriter<T> writer, Schema.Type type) {
+    if (AvroSchemaUtil.supportsMetrics(type)) {

Review comment:
       Rather than introducing `supportsMetrics`, why not just check the value writer to see whether it is a `MetricsAwareWriter`? I know that not all of the writers extend that class, but you could either introduce a `MetricsWriter` interface to signal that the inner writer supports metrics that all of the implementations extend, or maybe you could alter the hierarchy a little so that `StoredAsIntWriter` and `StoredAsLongWriter` actually do extend `MetricsAwareWriter`. Then you wouldn't need changes to `AvroSchemaUtil` or so many changes to option handling.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
##########
@@ -74,52 +78,60 @@ private FlinkValueWriters() {
     return new RowWriter(writers, types);
   }
 
-  private static class StringWriter implements ValueWriter<StringData> {
-    private static final StringWriter INSTANCE = new StringWriter();
-
-    private StringWriter() {
+  private static class StringWriter extends ValueWriters.MetricsAwareStringWriter<StringData> {
+    private StringWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(StringData s, Encoder encoder) throws IOException {
+    protected void writeVal(StringData s, Encoder encoder) throws IOException {
       // toBytes is cheaper than Avro calling toString, which incurs encoding costs
       encoder.writeString(new Utf8(s.toBytes()));
     }
   }
 
-  private static class DecimalWriter implements ValueWriter<DecimalData> {
+  private static class DecimalWriter extends ValueWriters.ComparableWriter<DecimalData> {
     private final int precision;
     private final int scale;
     private final ThreadLocal<byte[]> bytes;
 
-    private DecimalWriter(int precision, int scale) {
+    private DecimalWriter(int id, int precision, int scale) {
+      super(id);
       this.precision = precision;
       this.scale = scale;
       this.bytes = ThreadLocal.withInitial(() -> new byte[TypeUtil.decimalRequiredBytes(precision)]);
     }
 
     @Override
-    public void write(DecimalData d, Encoder encoder) throws IOException {
+    protected void writeVal(DecimalData d, Encoder encoder) throws IOException {
       encoder.writeFixed(DecimalUtil.toReusedFixLengthBytes(precision, scale, d.toBigDecimal(), bytes.get()));
     }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return metrics(DecimalData::toBigDecimal);

Review comment:
       Does this return a Java `BigDecimal`? I know we've had problems with this method in Spark because it actually produces a Scala `BigDecimal`.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/data/SparkValueWriters.java
##########
@@ -72,61 +71,41 @@ private SparkValueWriters() {
     return new StructWriter(writers, types);
   }
 
-  private static class StringWriter implements ValueWriter<UTF8String> {
-    private static final StringWriter INSTANCE = new StringWriter();
-
-    private StringWriter() {
+  private static class StringWriter extends ValueWriters.MetricsAwareStringWriter<UTF8String> {
+    private StringWriter(int id) {
+      super(id);
     }
 
     @Override
-    public void write(UTF8String s, Encoder encoder) throws IOException {
+    protected void writeVal(UTF8String s, Encoder encoder) throws IOException {
       // use getBytes because it may return the backing byte array if available.
       // otherwise, it copies to a new byte array, which is still cheaper than Avro
       // calling toString, which incurs encoding costs
       encoder.writeString(new Utf8(s.getBytes()));
     }
   }
 
-  private static class UUIDWriter implements ValueWriter<UTF8String> {
-    private static final ThreadLocal<ByteBuffer> BUFFER = ThreadLocal.withInitial(() -> {
-      ByteBuffer buffer = ByteBuffer.allocate(16);
-      buffer.order(ByteOrder.BIG_ENDIAN);
-      return buffer;
-    });
-
-    private static final UUIDWriter INSTANCE = new UUIDWriter();
-
-    private UUIDWriter() {
-    }
-
-    @Override
-    @SuppressWarnings("ByteBufferBackingArray")
-    public void write(UTF8String s, Encoder encoder) throws IOException {
-      // TODO: direct conversion from string to byte buffer
-      UUID uuid = UUID.fromString(s.toString());
-      ByteBuffer buffer = BUFFER.get();
-      buffer.rewind();
-      buffer.putLong(uuid.getMostSignificantBits());
-      buffer.putLong(uuid.getLeastSignificantBits());
-      encoder.writeFixed(buffer.array());
-    }
-  }
-
-  private static class DecimalWriter implements ValueWriter<Decimal> {
+  private static class DecimalWriter extends ValueWriters.MetricsAwareWriter<Decimal> {
     private final int precision;
     private final int scale;
     private final ThreadLocal<byte[]> bytes;
 
-    private DecimalWriter(int precision, int scale) {
+    private DecimalWriter(int id, int precision, int scale) {
+      super(id, Comparators.forType(Types.DecimalType.of(precision, scale)));
       this.precision = precision;
       this.scale = scale;
       this.bytes = ThreadLocal.withInitial(() -> new byte[TypeUtil.decimalRequiredBytes(precision)]);
     }
 
     @Override
-    public void write(Decimal d, Encoder encoder) throws IOException {
+    protected void writeVal(Decimal d, Encoder encoder) throws IOException {
       encoder.writeFixed(DecimalUtil.toReusedFixLengthBytes(precision, scale, d.toJavaBigDecimal(), bytes.get()));
     }
+
+    @Override
+    public Stream<FieldMetrics> metrics() {
+      return metrics(Decimal::toJavaBigDecimal);

Review comment:
       Good to see you got the right one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org