You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/25 00:32:00 UTC
[iceberg] branch master updated: Core: Use correct metrics config
for delete files (#2942)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0fda1c8 Core: Use correct metrics config for delete files (#2942)
0fda1c8 is described below
commit 0fda1c8cd4eca65566b60a2944aa6445ad1158b2
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Sep 24 17:31:50 2021 -0700
Core: Use correct metrics config for delete files (#2942)
---
.../java/org/apache/iceberg/MetricsConfig.java | 24 +++++++++
.../main/java/org/apache/iceberg/avro/Avro.java | 6 +++
.../apache/iceberg/data/BaseFileWriterFactory.java | 8 +--
.../org/apache/iceberg/io/TestWriterMetrics.java | 59 ++++++++++++++++++++++
.../iceberg/flink/sink/TestFlinkWriterMetrics.java | 1 +
.../java/org/apache/iceberg/parquet/Parquet.java | 1 -
.../spark/source/TestSparkWriterMetrics.java | 1 +
7 files changed, 95 insertions(+), 5 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java b/core/src/main/java/org/apache/iceberg/MetricsConfig.java
index 4d0aef8..bf8122f 100644
--- a/core/src/main/java/org/apache/iceberg/MetricsConfig.java
+++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.iceberg.MetricsModes.MetricsMode;
import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SortOrderUtil;
import org.slf4j.Logger;
@@ -37,6 +38,7 @@ import static org.apache.iceberg.TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX
public class MetricsConfig implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(MetricsConfig.class);
+ private static final Joiner DOT = Joiner.on('.');
private Map<String, MetricsMode> columnModes = Maps.newHashMap();
private MetricsMode defaultMode;
@@ -96,6 +98,28 @@ public class MetricsConfig implements Serializable {
return from(table.properties(), table.sortOrder());
}
+ /**
+ * Creates a metrics config for a position delete file.
+ *
+ * @param table an Iceberg table
+ */
+ public static MetricsConfig forPositionDelete(Table table) {
+ MetricsConfig config = new MetricsConfig();
+
+ config.columnModes.put(MetadataColumns.DELETE_FILE_PATH.name(), MetricsModes.Full.get());
+ config.columnModes.put(MetadataColumns.DELETE_FILE_POS.name(), MetricsModes.Full.get());
+
+ MetricsConfig tableConfig = forTable(table);
+
+ config.defaultMode = tableConfig.defaultMode;
+ tableConfig.columnModes.forEach((columnAlias, mode) -> {
+ String positionDeleteColumnAlias = DOT.join(MetadataColumns.DELETE_FILE_ROW_FIELD_NAME, columnAlias);
+ config.columnModes.put(positionDeleteColumnAlias, mode);
+ });
+
+ return config;
+ }
+
private static MetricsConfig from(Map<String, String> props, SortOrder order) {
MetricsConfig spec = new MetricsConfig();
String defaultModeAsString = props.getOrDefault(DEFAULT_WRITE_METRICS_MODE, DEFAULT_WRITE_METRICS_MODE_DEFAULT);
diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java
index 9255597..a1d210c 100644
--- a/core/src/main/java/org/apache/iceberg/avro/Avro.java
+++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java
@@ -354,6 +354,7 @@ public class Avro {
rowSchema(table.schema());
withSpec(table.spec());
setAll(table.properties());
+ metricsConfig(MetricsConfig.forTable(table));
return this;
}
@@ -386,6 +387,11 @@ public class Avro {
return this;
}
+ public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+ appenderBuilder.metricsConfig(newMetricsConfig);
+ return this;
+ }
+
public DeleteWriteBuilder createWriterFunc(Function<Schema, DatumWriter<?>> writerFunction) {
this.createWriterFunc = writerFunction;
return this;
diff --git a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
index bc3a0b3..62e09e6 100644
--- a/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
+++ b/data/src/main/java/org/apache/iceberg/data/BaseFileWriterFactory.java
@@ -153,10 +153,9 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
try {
switch (deleteFileFormat) {
case AVRO:
- // TODO: support metrics configs in Avro equality delete writer
-
Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile)
.setAll(properties)
+ .metricsConfig(metricsConfig)
.rowSchema(equalityDeleteRowSchema)
.equalityFieldIds(equalityFieldIds)
.withSpec(spec)
@@ -199,14 +198,14 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
OutputFile outputFile = file.encryptingOutputFile();
EncryptionKeyMetadata keyMetadata = file.keyMetadata();
Map<String, String> properties = table.properties();
-
- // TODO: build and pass a correct metrics config for position deletes
+ MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table);
try {
switch (deleteFileFormat) {
case AVRO:
Avro.DeleteWriteBuilder avroBuilder = Avro.writeDeletes(outputFile)
.setAll(properties)
+ .metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
@@ -220,6 +219,7 @@ public abstract class BaseFileWriterFactory<T> implements FileWriterFactory<T> {
case PARQUET:
Parquet.DeleteWriteBuilder parquetBuilder = Parquet.writeDeletes(outputFile)
.setAll(properties)
+ .metricsConfig(metricsConfig)
.rowSchema(positionDeleteRowSchema)
.withSpec(spec)
.withPartition(partition)
diff --git a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
index 758c803..5909731 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestWriterMetrics.java
@@ -20,20 +20,28 @@
package org.apache.iceberg.io;
import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
import java.util.Map;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestTables;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -146,4 +154,55 @@ public abstract class TestWriterMetrics<T> {
Assert.assertFalse(upperBounds.containsKey(4));
Assert.assertEquals(3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
}
+
+ @Test
+ public void testPositionDeleteMetrics() throws IOException {
+ Assume.assumeTrue(fileFormat == FileFormat.PARQUET);
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(SCHEMA);
+ EncryptedOutputFile outputFile = fileFactory.newOutputFile();
+ PositionDeleteWriter<T> deleteWriter = writerFactory.newPositionDeleteWriter(outputFile, table.spec(), null);
+
+ try {
+ T deletedRow = toRow(3, "3", true, 3L);
+ PositionDelete<T> positionDelete = PositionDelete.create();
+ positionDelete.set("File A", 1, deletedRow);
+ deleteWriter.write(positionDelete);
+ } finally {
+ deleteWriter.close();
+ }
+
+ DeleteFile deleteFile = deleteWriter.toDeleteFile();
+
+ int pathFieldId = MetadataColumns.DELETE_FILE_PATH.fieldId();
+ int posFieldId = MetadataColumns.DELETE_FILE_POS.fieldId();
+
+ // should have metrics for _file and _pos as well as two sorted fields (id, structField.longValue)
+
+ Map<Integer, ByteBuffer> lowerBounds = deleteFile.lowerBounds();
+
+ Assert.assertEquals(
+ CharBuffer.wrap("File A"),
+ Conversions.fromByteBuffer(Types.StringType.get(), lowerBounds.get(pathFieldId)));
+ Assert.assertEquals(1L, (long) Conversions.fromByteBuffer(Types.LongType.get(), lowerBounds.get(posFieldId)));
+
+ Assert.assertEquals(3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), lowerBounds.get(1)));
+ Assert.assertFalse(lowerBounds.containsKey(2));
+ Assert.assertFalse(lowerBounds.containsKey(3));
+ Assert.assertFalse(lowerBounds.containsKey(4));
+ Assert.assertEquals(3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), lowerBounds.get(5)));
+
+ Map<Integer, ByteBuffer> upperBounds = deleteFile.upperBounds();
+
+ Assert.assertEquals(
+ CharBuffer.wrap("File A"),
+ Conversions.fromByteBuffer(Types.StringType.get(), upperBounds.get(pathFieldId)));
+ Assert.assertEquals(1L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(posFieldId)));
+
+ Assert.assertEquals(3, (int) Conversions.fromByteBuffer(Types.IntegerType.get(), upperBounds.get(1)));
+ Assert.assertFalse(upperBounds.containsKey(2));
+ Assert.assertFalse(upperBounds.containsKey(3));
+ Assert.assertFalse(upperBounds.containsKey(4));
+ Assert.assertEquals(3L, (long) Conversions.fromByteBuffer(Types.LongType.get(), upperBounds.get(5)));
+ }
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
index f2890b1..0e94476 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkWriterMetrics.java
@@ -39,6 +39,7 @@ public class TestFlinkWriterMetrics extends TestWriterMetrics<RowData> {
.dataSchema(table.schema())
.dataFileFormat(fileFormat)
.deleteFileFormat(fileFormat)
+ .positionDeleteRowSchema(table.schema())
.build();
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 1d53d83..8d42fd5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -518,7 +518,6 @@ public class Parquet {
}
public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
- // TODO: keep full metrics for position delete file columns
appenderBuilder.metricsConfig(newMetricsConfig);
return this;
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java
index b17ed7d..026111c 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkWriterMetrics.java
@@ -39,6 +39,7 @@ public class TestSparkWriterMetrics extends TestWriterMetrics<InternalRow> {
.dataSchema(table.schema())
.dataFileFormat(fileFormat)
.deleteFileFormat(fileFormat)
+ .positionDeleteRowSchema(table.schema())
.build();
}