You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/25 00:32:00 UTC

[iceberg] branch master updated: Core: Use correct metrics config for delete files (#2942)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fda1c8  Core: Use correct metrics config for delete files (#2942)
0fda1c8 is described below

commit 0fda1c8cd4eca65566b60a2944aa6445ad1158b2
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Sep 24 17:31:50 2021 -0700

    Core: Use correct metrics config for delete files (#2942)
---
 .../java/org/apache/iceberg/MetricsConfig.java     | 24 +++++++++
 .../main/java/org/apache/iceberg/avro/Avro.java    |  6 +++
 .../apache/iceberg/data/BaseFileWriterFactory.java |  8 +--
 .../org/apache/iceberg/io/TestWriterMetrics.java   | 59 ++++++++++++++++++++++
 .../iceberg/flink/sink/TestFlinkWriterMetrics.java |  1 +
 .../java/org/apache/iceberg/parquet/Parquet.java   |  1 -
 .../spark/source/TestSparkWriterMetrics.java       |  1 +
 7 files changed, 95 insertions(+), 5 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java b/core/src/main/java/org/apache/iceberg/MetricsConfig.java
index 4d0aef8..bf8122f 100644
--- a/core/src/main/java/org/apache/iceberg/MetricsConfig.java
+++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.iceberg.MetricsModes.MetricsMode;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.SortOrderUtil;
 import org.slf4j.Logger;
@@ -37,6 +38,7 @@ import static org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX
 public class MetricsConfig implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(MetricsConfig.class);
+  private static final Joiner DOT = Joiner.on('.');
 
   private Map<String, MetricsMode> columnModes = Maps.newHashMap();
   private MetricsMode defaultMode;
@@ -96,6 +98,28 @@ public class MetricsConfig implements Serializable {
     return from(table.properties(), table.sortOrder());
   }
 
+  /**
+   * Creates a metrics config for a position delete file.
+   *
+   * @param table an Iceberg table
+   */
+  public static MetricsConfig forPositionDelete(Table table) {
+    MetricsConfig config = new MetricsConfig();
+
+    config.columnModes.put(MetadataColumns.DELETE_FILE_PATH.name(), MetricsModes.Full.get());
+    config.columnModes.put(MetadataColumns.DELETE_FILE_POS.name(), MetricsModes.Full.get());
+
+    MetricsConfig tableConfig = forTable(table);
+
+    config.defaultMode = tableConfig.defaultMode;
+    tableConfig.columnModes.forEach((columnAlias, mode) -> {
+      String positionDeleteColumnAlias = DOT.join(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME, columnAlias);
+      config.columnModes.put(positionDeleteColumnAlias, mode);
+    });
+
+    return config;
+  }
+
   private static MetricsConfig from(Map<String, String> props, SortOrder order) {
     MetricsConfig spec = new MetricsConfig();
     String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, DEFAULT_WRITE_METRICS_MODE_DEFAULT);
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 9255597..a1d210c 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -354,6 +354,7 @@ public class Avro {
       rowSchema(table.schema());
       withSpec(table.spec());
       setAll(table.properties());
+      metricsConfig(MetricsConfig.forTable(table));
       return this;
     }
 
@@ -386,6 +387,11 @@ public class Avro {
       return this;
     }
 
+    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
     public DeleteWriteBuilder createWriterFunc(Function<Schema, DatumWriter<?>> writerFunction) {
       this.createWriterFunc = writerFunction;
       return this;
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
index bc3a0b3..62e09e6 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
@@ -153,10 +153,9 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
     try {
       switch (deleteFileFormat) {
         case AVRO:
-          // TODO: support metrics configs in Avro equality delete writer
-
           Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile)
               .setAll(properties)
+              .metricsConfig(metricsConfig)
               .rowSchema(equalityDeleteRowSchema)
               .equalityFieldIds(equalityFieldIds)
               .withSpec(spec)
@@ -199,14 +198,14 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
     OutputFile outputFile = file.encryptingOutputFile();
     EncryptionKeyMetadata keyMetadata = file.keyMetadata();
     Map<String, String> properties = table.properties();
-
-    // TODO: build and pass a correct metrics config for position deletes
+    MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table);
 
     try {
       switch (deleteFileFormat) {
         case AVRO:
           Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile)
               .setAll(properties)
+              .metricsConfig(metricsConfig)
               .rowSchema(positionDeleteRowSchema)
               .withSpec(spec)
               .withPartition(partition)
@@ -220,6 +219,7 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
         case PARQUET:
           Parquet.DeleteWriteBuilder parquetBuilder = Parquet.writeDeletes(outputFile)
               .setAll(properties)
+              .metricsConfig(metricsConfig)
               .rowSchema(positionDeleteRowSchema)
               .withSpec(spec)
               .withPartition(partition)
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
index 758c803..5909731 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
@@ -20,20 +20,28 @@
 package org.apache.iceberg.io;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
 import java.util.Map;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TestTables;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -146,4 +154,55 @@ public abstract class TestWriterMetrics<T> {
     Assert.assertFalse(upperBounds.containsKey(4));
     Assert.assertEquals(3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
   }
+
+  @Test
+  public void testPositionDeleteMetrics() throws IOException {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+
+    FileWriterFactory<T> writerFactory = newWriterFactory(SCHEMA);
+    EncryptedOutputFile outputFile = fileFactory.newOutputFile();
+    PositionDeleteWriter<T> deleteWriter = writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null);
+
+    try {
+      T deletedRow = toRow(3, "3", true, 3L);
+      PositionDelete<T> positionDelete = PositionDelete.create();
+      positionDelete.set("File A", 1, deletedRow);
+      deleteWriter.write(positionDelete);
+    } finally {
+      deleteWriter.close();
+    }
+
+    DeleteFile deleteFile = deleteWriter.toDeleteFile();
+
+    int pathFieldId = MetadataColumns.DELETE_FILE_PATH.fieldId();
+    int posFieldId = MetadataColumns.DELETE_FILE_POS.fieldId();
+
+    // should have metrics for _file and _pos as well as two sorted fields (id, structField.longValue)
+
+    Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
+
+    Assert.assertEquals(
+        CharBuffer.wrap("File A"),
+        Conversions.fromByteBuffer(Types.StringType.get(), lowerBounds.get(pathFieldId)));
+    Assert.assertEquals(1L, (long) Conversions.fromByteBuffer(Types.LongType.get(), lowerBounds.get(posFieldId)));
+
+    Assert.assertEquals(3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBounds.get(1)));
+    Assert.assertFalse(lowerBounds.containsKey(2));
+    Assert.assertFalse(lowerBounds.containsKey(3));
+    Assert.assertFalse(lowerBounds.containsKey(4));
+    Assert.assertEquals(3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), lowerBounds.get(5)));
+
+    Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
+
+    Assert.assertEquals(
+        CharBuffer.wrap("File A"),
+        Conversions.fromByteBuffer(Types.StringType.get(), upperBounds.get(pathFieldId)));
+    Assert.assertEquals(1L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(posFieldId)));
+
+    Assert.assertEquals(3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), upperBounds.get(1)));
+    Assert.assertFalse(upperBounds.containsKey(2));
+    Assert.assertFalse(upperBounds.containsKey(3));
+    Assert.assertFalse(upperBounds.containsKey(4));
+    Assert.assertEquals(3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
+  }
 }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
index f2890b1..0e94476 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
@@ -39,6 +39,7 @@ public class TestFlinkWriterMetrics extends TestWriterMetrics<RowData> {
         .dataSchema(table.schema())
         .dataFileFormat(fileFormat)
         .deleteFileFormat(fileFormat)
+        .positionDeleteRowSchema(table.schema())
         .build();
   }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 1d53d83..8d42fd5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -518,7 +518,6 @@ public class Parquet {
     }
 
     public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
-      // TODO: keep full metrics for position delete file columns
       appenderBuilder.metricsConfig(newMetricsConfig);
       return this;
     }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java
index b17ed7d..026111c 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java
@@ -39,6 +39,7 @@ public class TestSparkWriterMetrics extends TestWriterMetrics<InternalRow> {
         .dataSchema(table.schema())
         .dataFileFormat(fileFormat)
         .deleteFileFormat(fileFormat)
+        .positionDeleteRowSchema(table.schema())
         .build();
   }