You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/11/14 18:43:57 UTC
[iceberg] branch master updated: Flink: Add SupportsRowPosition to Avro reader to fix position deletes (#3540)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new beb5d6a Flink: Add SupportsRowPosition to Avro reader to fix position deletes (#3540)
beb5d6a is described below
commit beb5d6ab3991293185b4b27067869705318d5b26
Author: openinx <op...@gmail.com>
AuthorDate: Mon Nov 15 02:43:46 2021 +0800
Flink: Add SupportsRowPosition to Avro reader to fix position deletes (#3540)
---
.../java/org/apache/iceberg/data/FileHelpers.java | 71 ++++++++++++----------
.../apache/iceberg/flink/data/FlinkAvroReader.java | 11 +++-
.../apache/iceberg/flink/data/FlinkAvroReader.java | 11 +++-
3 files changed, 58 insertions(+), 35 deletions(-)
diff --git a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
index 5b58c8d..dfcb75b 100644
--- a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
+++ b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
@@ -22,45 +22,47 @@ package org.apache.iceberg.data;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+
public class FileHelpers {
private FileHelpers() {
}
public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(Table table, OutputFile out,
- List<Pair<CharSequence, Long>> deletes)
+ List<Pair<CharSequence, Long>> deletes)
throws IOException {
return writeDeleteFile(table, out, null, deletes);
}
public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(Table table, OutputFile out, StructLike partition,
- List<Pair<CharSequence, Long>> deletes)
+ List<Pair<CharSequence, Long>> deletes)
throws IOException {
- PositionDeleteWriter<?> writer = Parquet.writeDeletes(out)
- .withSpec(table.spec())
- .setAll(table.properties())
- .metricsConfig(MetricsConfig.forTable(table))
- .withPartition(partition)
- .overwrite()
- .buildPositionWriter();
+ FileFormat format = defaultFormat(table.properties());
+ FileAppenderFactory<Record> factory = new GenericAppenderFactory(table.schema(), table.spec());
+ PositionDeleteWriter<?> writer = factory.newPosDeleteWriter(encrypt(out), format, partition);
try (Closeable toClose = writer) {
for (Pair<CharSequence, Long> delete : deletes) {
writer.delete(delete.first(), delete.second());
@@ -76,16 +78,14 @@ public class FileHelpers {
}
public static DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike partition,
- List<Record> deletes, Schema deleteRowSchema) throws IOException {
- EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
- .forTable(table)
- .withPartition(partition)
- .rowSchema(deleteRowSchema)
- .createWriterFunc(GenericParquetWriter::buildWriter)
- .overwrite()
- .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
- .buildEqualityWriter();
+ List<Record> deletes, Schema deleteRowSchema)
+ throws IOException {
+ FileFormat format = defaultFormat(table.properties());
+ int[] equalityFieldIds = deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();
+ FileAppenderFactory<Record> factory = new GenericAppenderFactory(table.schema(), table.spec(),
+ equalityFieldIds, deleteRowSchema, null);
+ EqualityDeleteWriter<Record> writer = factory.newEqDeleteWriter(encrypt(out), format, partition);
try (Closeable toClose = writer) {
writer.deleteAll(deletes);
}
@@ -94,18 +94,16 @@ public class FileHelpers {
}
public static DataFile writeDataFile(Table table, OutputFile out, List<Record> rows) throws IOException {
- FileAppender<Record> writer = Parquet.write(out)
- .createWriterFunc(GenericParquetWriter::buildWriter)
- .schema(table.schema())
- .overwrite()
- .build();
+ FileFormat format = defaultFormat(table.properties());
+ GenericAppenderFactory factory = new GenericAppenderFactory(table.schema());
+ FileAppender<Record> writer = factory.newAppender(out, format);
try (Closeable toClose = writer) {
writer.addAll(rows);
}
return DataFiles.builder(table.spec())
- .withFormat(FileFormat.PARQUET)
+ .withFormat(format)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
@@ -115,18 +113,16 @@ public class FileHelpers {
public static DataFile writeDataFile(Table table, OutputFile out, StructLike partition, List<Record> rows)
throws IOException {
- FileAppender<Record> writer = Parquet.write(out)
- .createWriterFunc(GenericParquetWriter::buildWriter)
- .schema(table.schema())
- .overwrite()
- .build();
+ FileFormat format = defaultFormat(table.properties());
+ GenericAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec());
+ FileAppender<Record> writer = factory.newAppender(out, format);
try (Closeable toClose = writer) {
writer.addAll(rows);
}
return DataFiles.builder(table.spec())
- .withFormat(FileFormat.PARQUET)
+ .withFormat(format)
.withPath(out.location())
.withPartition(partition)
.withFileSizeInBytes(writer.length())
@@ -134,4 +130,13 @@ public class FileHelpers {
.withMetrics(writer.metrics())
.build();
}
+
+ private static EncryptedOutputFile encrypt(OutputFile out) {
+ return EncryptedFiles.encryptedOutput(out, EncryptionKeyMetadata.EMPTY);
+ }
+
+ private static FileFormat defaultFormat(Map<String, String> properties) {
+ String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+ return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ }
}
diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
index aa6a029..991ef63 100644
--- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
+++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.flink.data;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
@@ -29,6 +30,7 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
+import org.apache.iceberg.avro.SupportsRowPosition;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.data.avro.DecoderResolver;
@@ -36,7 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-public class FlinkAvroReader implements DatumReader<RowData> {
+public class FlinkAvroReader implements DatumReader<RowData>, SupportsRowPosition {
private final Schema readSchema;
private final ValueReader<RowData> reader;
@@ -63,6 +65,13 @@ public class FlinkAvroReader implements DatumReader<RowData> {
return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse);
}
+ @Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+ }
+ }
+
private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
private final Map<Integer, ?> idToConstant;
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
index aa6a029..991ef63 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.flink.data;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
@@ -29,6 +30,7 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
+import org.apache.iceberg.avro.SupportsRowPosition;
import org.apache.iceberg.avro.ValueReader;
import org.apache.iceberg.avro.ValueReaders;
import org.apache.iceberg.data.avro.DecoderResolver;
@@ -36,7 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-public class FlinkAvroReader implements DatumReader<RowData> {
+public class FlinkAvroReader implements DatumReader<RowData>, SupportsRowPosition {
private final Schema readSchema;
private final ValueReader<RowData> reader;
@@ -63,6 +65,13 @@ public class FlinkAvroReader implements DatumReader<RowData> {
return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse);
}
+ @Override
+ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+ if (reader instanceof SupportsRowPosition) {
+ ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+ }
+ }
+
private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
private final Map<Integer, ?> idToConstant;