You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/11/14 18:43:57 UTC

[iceberg] branch master updated: Flink: Add SupportsRowPosition to Avro reader to fix position deletes (#3540)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new beb5d6a  Flink: Add SupportsRowPosition to Avro reader to fix position deletes (#3540)
beb5d6a is described below

commit beb5d6ab3991293185b4b27067869705318d5b26
Author: openinx <op...@gmail.com>
AuthorDate: Mon Nov 15 02:43:46 2021 +0800

    Flink: Add SupportsRowPosition to Avro reader to fix position deletes (#3540)
---
 .../java/org/apache/iceberg/data/FileHelpers.java  | 71 ++++++++++++----------
 .../apache/iceberg/flink/data/FlinkAvroReader.java | 11 +++-
 .../apache/iceberg/flink/data/FlinkAvroReader.java | 11 +++-
 3 files changed, 58 insertions(+), 35 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
index 5b58c8d..dfcb75b 100644
--- a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
+++ b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java
@@ -22,45 +22,47 @@ package org.apache.iceberg.data;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.deletes.EqualityDeleteWriter;
 import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
 import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.CharSequenceSet;
 import org.apache.iceberg.util.Pair;
 
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+
 public class FileHelpers {
   private FileHelpers() {
   }
 
   public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(Table table, OutputFile out,
-                                                                    List<Pair<CharSequence, Long>> deletes)
+                                                                  List<Pair<CharSequence, Long>> deletes)
       throws IOException {
     return writeDeleteFile(table, out, null, deletes);
   }
 
   public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(Table table, OutputFile out, StructLike partition,
-                                                                    List<Pair<CharSequence, Long>> deletes)
+                                                                  List<Pair<CharSequence, Long>> deletes)
       throws IOException {
-    PositionDeleteWriter<?> writer = Parquet.writeDeletes(out)
-        .withSpec(table.spec())
-        .setAll(table.properties())
-        .metricsConfig(MetricsConfig.forTable(table))
-        .withPartition(partition)
-        .overwrite()
-        .buildPositionWriter();
+    FileFormat format = defaultFormat(table.properties());
+    FileAppenderFactory<Record> factory = new GenericAppenderFactory(table.schema(), table.spec());
 
+    PositionDeleteWriter<?> writer = factory.newPosDeleteWriter(encrypt(out), format, partition);
     try (Closeable toClose = writer) {
       for (Pair<CharSequence, Long> delete : deletes) {
         writer.delete(delete.first(), delete.second());
@@ -76,16 +78,14 @@ public class FileHelpers {
   }
 
   public static DeleteFile writeDeleteFile(Table table, OutputFile out, StructLike partition,
-                                           List<Record> deletes, Schema deleteRowSchema) throws IOException {
-    EqualityDeleteWriter<Record> writer = Parquet.writeDeletes(out)
-        .forTable(table)
-        .withPartition(partition)
-        .rowSchema(deleteRowSchema)
-        .createWriterFunc(GenericParquetWriter::buildWriter)
-        .overwrite()
-        .equalityFieldIds(deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray())
-        .buildEqualityWriter();
+                                           List<Record> deletes, Schema deleteRowSchema)
+      throws IOException {
+    FileFormat format = defaultFormat(table.properties());
+    int[] equalityFieldIds = deleteRowSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray();
+    FileAppenderFactory<Record> factory = new GenericAppenderFactory(table.schema(), table.spec(),
+        equalityFieldIds, deleteRowSchema, null);
 
+    EqualityDeleteWriter<Record> writer = factory.newEqDeleteWriter(encrypt(out), format, partition);
     try (Closeable toClose = writer) {
       writer.deleteAll(deletes);
     }
@@ -94,18 +94,16 @@ public class FileHelpers {
   }
 
   public static DataFile writeDataFile(Table table, OutputFile out, List<Record> rows) throws IOException {
-    FileAppender<Record> writer = Parquet.write(out)
-        .createWriterFunc(GenericParquetWriter::buildWriter)
-        .schema(table.schema())
-        .overwrite()
-        .build();
+    FileFormat format = defaultFormat(table.properties());
+    GenericAppenderFactory factory = new GenericAppenderFactory(table.schema());
 
+    FileAppender<Record> writer = factory.newAppender(out, format);
     try (Closeable toClose = writer) {
       writer.addAll(rows);
     }
 
     return DataFiles.builder(table.spec())
-        .withFormat(FileFormat.PARQUET)
+        .withFormat(format)
         .withPath(out.location())
         .withFileSizeInBytes(writer.length())
         .withSplitOffsets(writer.splitOffsets())
@@ -115,18 +113,16 @@ public class FileHelpers {
 
   public static DataFile writeDataFile(Table table, OutputFile out, StructLike partition, List<Record> rows)
       throws IOException {
-    FileAppender<Record> writer = Parquet.write(out)
-        .createWriterFunc(GenericParquetWriter::buildWriter)
-        .schema(table.schema())
-        .overwrite()
-        .build();
+    FileFormat format = defaultFormat(table.properties());
+    GenericAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec());
 
+    FileAppender<Record> writer = factory.newAppender(out, format);
     try (Closeable toClose = writer) {
       writer.addAll(rows);
     }
 
     return DataFiles.builder(table.spec())
-        .withFormat(FileFormat.PARQUET)
+        .withFormat(format)
         .withPath(out.location())
         .withPartition(partition)
         .withFileSizeInBytes(writer.length())
@@ -134,4 +130,13 @@ public class FileHelpers {
         .withMetrics(writer.metrics())
         .build();
   }
+
+  private static EncryptedOutputFile encrypt(OutputFile out) {
+    return EncryptedFiles.encryptedOutput(out, EncryptionKeyMetadata.EMPTY);
+  }
+
+  private static FileFormat defaultFormat(Map<String, String> properties) {
+    String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
+    return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+  }
 }
diff --git a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
index aa6a029..991ef63 100644
--- a/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
+++ b/flink/v1.12/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.flink.data;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -29,6 +30,7 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
+import org.apache.iceberg.avro.SupportsRowPosition;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
 import org.apache.iceberg.data.avro.DecoderResolver;
@@ -36,7 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
-public class FlinkAvroReader implements DatumReader<RowData> {
+public class FlinkAvroReader implements DatumReader<RowData>, SupportsRowPosition {
 
   private final Schema readSchema;
   private final ValueReader<RowData> reader;
@@ -63,6 +65,13 @@ public class FlinkAvroReader implements DatumReader<RowData> {
     return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse);
   }
 
+  @Override
+  public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+    if (reader instanceof SupportsRowPosition) {
+      ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+    }
+  }
+
   private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
     private final Map<Integer, ?> idToConstant;
 
diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
index aa6a029..991ef63 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroReader.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.flink.data;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
@@ -29,6 +30,7 @@ import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.flink.table.data.RowData;
 import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
+import org.apache.iceberg.avro.SupportsRowPosition;
 import org.apache.iceberg.avro.ValueReader;
 import org.apache.iceberg.avro.ValueReaders;
 import org.apache.iceberg.data.avro.DecoderResolver;
@@ -36,7 +38,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
-public class FlinkAvroReader implements DatumReader<RowData> {
+public class FlinkAvroReader implements DatumReader<RowData>, SupportsRowPosition {
 
   private final Schema readSchema;
   private final ValueReader<RowData> reader;
@@ -63,6 +65,13 @@ public class FlinkAvroReader implements DatumReader<RowData> {
     return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse);
   }
 
+  @Override
+  public void setRowPositionSupplier(Supplier<Long> posSupplier) {
+    if (reader instanceof SupportsRowPosition) {
+      ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
+    }
+  }
+
   private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
     private final Map<Integer, ?> idToConstant;