You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/10 21:54:55 UTC

[GitHub] [iceberg] rdblue opened a new pull request #1316: Add delete file writers for Avro

rdblue opened a new pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316


   This adds delete file writers that wrap file format appenders and create `DeleteFile` instances, and Avro builders to produce position and equality delete files in Avro.
   
   Equality delete writers write rows, like other writers created using `Avro.write`. Position delete writers delete a path, position, and row triple. Both writers support the `createWriterFunc` just like `Avro.write` to create a value writer to deconstruct the row, which enables engines to plug in their own object models. The tests use Iceberg generics.
   
   Because the position delete files also contain row data, this changes the position delete schema to use reserved field IDs that will not conflict with table field IDs. These IDs will be added to the spec as reserved IDs, along with the _file and _pos metadata column IDs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468292877



##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -31,11 +31,20 @@
   private MetadataColumns() {
   }
 
+  // IDs Integer.MAX_VALUE - (1-100) are used for metadata columns
   public static final NestedField FILE_PATH = NestedField.required(
       Integer.MAX_VALUE - 1, "_file", Types.StringType.get(), "Path of the file in which a row is stored");
   public static final NestedField ROW_POSITION = NestedField.required(
       Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
 
+  // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
+  public static final NestedField DELETE_FILE_PATH = NestedField.required(

Review comment:
       Why we need reserved columns upon metadata columns? The column docs are the same for both, will this confuses users if they want to use?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue closed pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue closed pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r469066028



##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -275,10 +276,11 @@ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
     }
 
     public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
+      Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema`");

Review comment:
       nit: there's a redundant back quote at the end of this line ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#issuecomment-673667764


   I merged these changes as part of #1327, which added Parquet writers as well. Thanks for reviewing, @JingsongLi, @openinx, and @chenjunjiedada!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468907371



##########
File path: core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroDeleteWriters {
+  private static final Schema SCHEMA = new Schema(
+      NestedField.required(1, "id", Types.LongType.get()),
+      NestedField.optional(2, "data", Types.StringType.get()));
+
+  private List<Record> records;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void createDeleteRecords() {
+    GenericRecord record = GenericRecord.create(SCHEMA);
+
+    ImmutableList.Builder<Record> builder = ImmutableList.builder();
+    builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a")));
+    builder.add(record.copy(ImmutableMap.of("id", 2L, "data", "b")));
+    builder.add(record.copy(ImmutableMap.of("id", 3L, "data", "c")));
+    builder.add(record.copy(ImmutableMap.of("id", 4L, "data", "d")));
+    builder.add(record.copy(ImmutableMap.of("id", 5L, "data", "e")));
+
+    this.records = builder.build();
+  }
+
+  @Test
+  public void testEqualityDeleteWriter() throws IOException {
+    File deleteFile = temp.newFile();
+
+    OutputFile out = Files.localOutput(deleteFile);
+    EqualityDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
+        .createWriterFunc(DataWriter::create)
+        .overwrite()
+        .rowSchema(SCHEMA)
+        .withSpec(PartitionSpec.unpartitioned())
+        .equalityFieldIds(1)
+        .buildEqualityWriter();
+
+    try (EqualityDeleteWriter<Record> writer = deleteWriter) {
+      writer.deleteAll(records);
+    }
+
+    DeleteFile metadata = deleteWriter.toDeleteFile();
+    Assert.assertEquals("Format should be Avro", FileFormat.AVRO, metadata.format());
+    Assert.assertEquals("Should be equality deletes", FileContent.EQUALITY_DELETES, metadata.content());
+    Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
+    Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
+    Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
+
+    List<Record> deletedRecords;
+    try (AvroIterable<Record> reader = Avro.read(out.toInputFile())
+        .project(SCHEMA)
+        .createReaderFunc(DataReader::create)
+        .build()) {
+      deletedRecords = Lists.newArrayList(reader);
+    }
+
+    Assert.assertEquals("Deleted records should match expected", records, deletedRecords);
+  }
+
+  @Test
+  public void testPositionDeleteWriter() throws IOException {
+    File deleteFile = temp.newFile();
+
+    Schema deleteSchema = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,
+        MetadataColumns.DELETE_FILE_POS,
+        NestedField.optional(MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", SCHEMA.asStruct()));
+
+    String deletePath = "s3://bucket/path/file.parquet";
+    GenericRecord posDelete = GenericRecord.create(deleteSchema);
+    List<Record> expectedDeleteRecords = Lists.newArrayList();
+
+    OutputFile out = Files.localOutput(deleteFile);
+    PositionDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
+        .createWriterFunc(DataWriter::create)
+        .overwrite()
+        .rowSchema(SCHEMA)
+        .withSpec(PartitionSpec.unpartitioned())
+        .buildPositionWriter();
+
+    try (PositionDeleteWriter<Record> writer = deleteWriter) {
+      for (int i = 0; i < records.size(); i += 1) {
+        int pos = i * 3 + 2;
+        writer.delete(deletePath, pos, records.get(i));

Review comment:
       I added a test for this and I also updated the builder. Now, if you don't supply a row schema, the writer function for the row's object model is no longer required. And delete files will be written without the `row` column.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468765920



##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -31,11 +31,20 @@
   private MetadataColumns() {
   }
 
+  // IDs Integer.MAX_VALUE - (1-100) are used for metadata columns
   public static final NestedField FILE_PATH = NestedField.required(
       Integer.MAX_VALUE - 1, "_file", Types.StringType.get(), "Path of the file in which a row is stored");
   public static final NestedField ROW_POSITION = NestedField.required(
       Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
 
+  // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
+  public static final NestedField DELETE_FILE_PATH = NestedField.required(
+      Integer.MAX_VALUE - 101, "file_path", Types.StringType.get(), "Path of a file in which a deleted row is stored");
+  public static final NestedField DELETE_FILE_POS = NestedField.required(
+      Integer.MAX_VALUE - 102, "pos", Types.LongType.get(), "Ordinal position of a deleted row in the data file");
+  public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;

Review comment:
       Yes, we will need to update the spec. Currently, it uses IDs 1 and 2 for path and position, but those will conflict with user-defined field IDs, so we need to update them to the values here. We also need to add the reserved `row` field ID.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] JingsongLi commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468437233



##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -31,11 +31,20 @@
   private MetadataColumns() {
   }
 
+  // IDs Integer.MAX_VALUE - (1-100) are used for metadata columns
   public static final NestedField FILE_PATH = NestedField.required(
       Integer.MAX_VALUE - 1, "_file", Types.StringType.get(), "Path of the file in which a row is stored");
   public static final NestedField ROW_POSITION = NestedField.required(
       Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
 
+  // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
+  public static final NestedField DELETE_FILE_PATH = NestedField.required(

Review comment:
       I think they are different:
   - reserved columns are just for position delete files.
   - metadata columns are for all files. Actually, Position delete files also have these metadata columns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468398258



##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -156,11 +170,175 @@ private CodecFactory codec() {
       Preconditions.checkNotNull(schema, "Schema is required");
       Preconditions.checkNotNull(name, "Table name is required and cannot be null");
 
+      Function<Schema, DatumWriter<?>> writerFunc;
+      if (createWriterFunc != null) {
+        writerFunc = createWriterFunc;
+      } else {
+        writerFunc = GenericAvroWriter::new;
+      }
+
       // add the Iceberg schema to keyValueMetadata
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
       return new AvroFileAppender<>(
-          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
+          AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
+    }
+  }
+
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+    private org.apache.iceberg.Schema rowSchema;
+    private PartitionSpec spec;
+    private StructLike partition;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      setAll(table.properties());
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.set(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.meta(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(Map<String, String> properties) {
+      appenderBuilder.meta(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(Function<Schema, DatumWriter<?>> writerFunction) {
+      this.createWriterFunc = writerFunction;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(org.apache.iceberg.Schema newRowSchema) {
+      this.rowSchema = newRowSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = fieldIds.stream().mapToInt(id -> id).toArray();
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
+
+      set("delete-type", "equality");
+      set("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      if (createWriterFunc != null) {
+        appenderBuilder.createWriterFunc(createWriterFunc);
+      }
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
+      Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+
+      set("delete-type", "position");
+
+      // the appender uses the row schema wrapped with position fields
+      appenderBuilder.schema(new org.apache.iceberg.Schema(
+          MetadataColumns.DELETE_FILE_PATH,
+          MetadataColumns.DELETE_FILE_POS,
+          NestedField.optional(
+              MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(),
+              MetadataColumns.DELETE_FILE_ROW_DOC)));

Review comment:
       Seems the whole schema construction would be a separate static method (with argument `rowSchema`), I saw the unit test also did the duplicate schema construction. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468774858



##########
File path: core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+public class PositionDeleteWriter<T> implements Closeable {
+  private final FileAppender<StructLike> appender;
+  private final FileFormat format;
+  private final String location;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+  private final ByteBuffer keyMetadata;
+  private final PositionDelete<T> delete;
+  private DeleteFile deleteFile = null;
+
+  public PositionDeleteWriter(FileAppender<StructLike> appender, FileFormat format, String location,
+                              PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata,
+                              Types.StructType rowType) {

Review comment:
       I'll remove it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468770410



##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -156,11 +170,175 @@ private CodecFactory codec() {
       Preconditions.checkNotNull(schema, "Schema is required");
       Preconditions.checkNotNull(name, "Table name is required and cannot be null");
 
+      Function<Schema, DatumWriter<?>> writerFunc;
+      if (createWriterFunc != null) {
+        writerFunc = createWriterFunc;
+      } else {
+        writerFunc = GenericAvroWriter::new;
+      }
+
       // add the Iceberg schema to keyValueMetadata
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
       return new AvroFileAppender<>(
-          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
+          AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
+    }
+  }
+
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+    private org.apache.iceberg.Schema rowSchema;
+    private PartitionSpec spec;
+    private StructLike partition;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      setAll(table.properties());

Review comment:
       Yes, I think so.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468260027



##########
File path: core/src/main/java/org/apache/iceberg/FileMetadata.java
##########
@@ -111,7 +111,7 @@ public Builder ofPositionDeletes() {
       return this;
     }
 
-    public Builder ofEqualityDeletes() {
+    public Builder ofEqualityDeletes(int... equalityFieldIds) {

Review comment:
       The PR for this is #1318.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468776261



##########
File path: core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroDeleteWriters {
+  private static final Schema SCHEMA = new Schema(
+      NestedField.required(1, "id", Types.LongType.get()),
+      NestedField.optional(2, "data", Types.StringType.get()));
+
+  private List<Record> records;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void createDeleteRecords() {
+    GenericRecord record = GenericRecord.create(SCHEMA);
+
+    ImmutableList.Builder<Record> builder = ImmutableList.builder();
+    builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a")));
+    builder.add(record.copy(ImmutableMap.of("id", 2L, "data", "b")));
+    builder.add(record.copy(ImmutableMap.of("id", 3L, "data", "c")));
+    builder.add(record.copy(ImmutableMap.of("id", 4L, "data", "d")));
+    builder.add(record.copy(ImmutableMap.of("id", 5L, "data", "e")));
+
+    this.records = builder.build();
+  }
+
+  @Test
+  public void testEqualityDeleteWriter() throws IOException {
+    File deleteFile = temp.newFile();
+
+    OutputFile out = Files.localOutput(deleteFile);
+    EqualityDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
+        .createWriterFunc(DataWriter::create)
+        .overwrite()
+        .rowSchema(SCHEMA)
+        .withSpec(PartitionSpec.unpartitioned())
+        .equalityFieldIds(1)
+        .buildEqualityWriter();
+
+    try (EqualityDeleteWriter<Record> writer = deleteWriter) {
+      writer.deleteAll(records);
+    }
+
+    DeleteFile metadata = deleteWriter.toDeleteFile();
+    Assert.assertEquals("Format should be Avro", FileFormat.AVRO, metadata.format());
+    Assert.assertEquals("Should be equality deletes", FileContent.EQUALITY_DELETES, metadata.content());
+    Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
+    Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
+    Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
+
+    List<Record> deletedRecords;
+    try (AvroIterable<Record> reader = Avro.read(out.toInputFile())
+        .project(SCHEMA)
+        .createReaderFunc(DataReader::create)
+        .build()) {
+      deletedRecords = Lists.newArrayList(reader);
+    }
+
+    Assert.assertEquals("Deleted records should match expected", records, deletedRecords);
+  }
+
+  @Test
+  public void testPositionDeleteWriter() throws IOException {
+    File deleteFile = temp.newFile();
+
+    Schema deleteSchema = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,
+        MetadataColumns.DELETE_FILE_POS,
+        NestedField.optional(MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", SCHEMA.asStruct()));
+
+    String deletePath = "s3://bucket/path/file.parquet";
+    GenericRecord posDelete = GenericRecord.create(deleteSchema);
+    List<Record> expectedDeleteRecords = Lists.newArrayList();
+
+    OutputFile out = Files.localOutput(deleteFile);
+    PositionDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
+        .createWriterFunc(DataWriter::create)
+        .overwrite()
+        .rowSchema(SCHEMA)
+        .withSpec(PartitionSpec.unpartitioned())
+        .buildPositionWriter();
+
+    try (PositionDeleteWriter<Record> writer = deleteWriter) {
+      for (int i = 0; i < records.size(); i += 1) {
+        int pos = i * 3 + 2;
+        writer.delete(deletePath, pos, records.get(i));

Review comment:
       I agree, we should have a case where rows aren't passed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468773938



##########
File path: core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.StructLike;
+
+@SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
+public class PositionDelete<R> implements StructLike {

Review comment:
       This could be used instead of that on the read path. I'm not sure that we need a custom class on the read path, though. We'll know more when you update that PR, I think.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468208737



##########
File path: core/src/main/java/org/apache/iceberg/FileMetadata.java
##########
@@ -111,7 +111,7 @@ public Builder ofPositionDeletes() {
       return this;
     }
 
-    public Builder ofEqualityDeletes() {
+    public Builder ofEqualityDeletes(int... equalityFieldIds) {

Review comment:
       These IDs will be added to `DeleteFile` metadata in a separate PR. This is just a placeholder for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468355158



##########
File path: core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+public class PositionDeleteWriter<T> implements Closeable {
+  private final FileAppender<StructLike> appender;
+  private final FileFormat format;
+  private final String location;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+  private final ByteBuffer keyMetadata;
+  private final PositionDelete<T> delete;
+  private DeleteFile deleteFile = null;
+
+  public PositionDeleteWriter(FileAppender<StructLike> appender, FileFormat format, String location,
+                              PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata,
+                              Types.StructType rowType) {

Review comment:
       nit: The `rowType` is never used ?

##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -156,11 +170,175 @@ private CodecFactory codec() {
       Preconditions.checkNotNull(schema, "Schema is required");
       Preconditions.checkNotNull(name, "Table name is required and cannot be null");
 
+      Function<Schema, DatumWriter<?>> writerFunc;
+      if (createWriterFunc != null) {
+        writerFunc = createWriterFunc;
+      } else {
+        writerFunc = GenericAvroWriter::new;
+      }
+
       // add the Iceberg schema to keyValueMetadata
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
       return new AvroFileAppender<>(
-          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
+          AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
+    }
+  }
+
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+    private org.apache.iceberg.Schema rowSchema;
+    private PartitionSpec spec;
+    private StructLike partition;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      setAll(table.properties());

Review comment:
       Should we also add `withSpec(table.spec());`  here ? 

##########
File path: core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroDeleteWriters {
+  private static final Schema SCHEMA = new Schema(
+      NestedField.required(1, "id", Types.LongType.get()),
+      NestedField.optional(2, "data", Types.StringType.get()));
+
+  private List<Record> records;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void createDeleteRecords() {
+    GenericRecord record = GenericRecord.create(SCHEMA);
+
+    ImmutableList.Builder<Record> builder = ImmutableList.builder();
+    builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a")));
+    builder.add(record.copy(ImmutableMap.of("id", 2L, "data", "b")));
+    builder.add(record.copy(ImmutableMap.of("id", 3L, "data", "c")));
+    builder.add(record.copy(ImmutableMap.of("id", 4L, "data", "d")));
+    builder.add(record.copy(ImmutableMap.of("id", 5L, "data", "e")));
+
+    this.records = builder.build();
+  }
+
+  @Test
+  public void testEqualityDeleteWriter() throws IOException {
+    File deleteFile = temp.newFile();
+
+    OutputFile out = Files.localOutput(deleteFile);
+    EqualityDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
+        .createWriterFunc(DataWriter::create)
+        .overwrite()
+        .rowSchema(SCHEMA)
+        .withSpec(PartitionSpec.unpartitioned())

Review comment:
       Q: should we make this unit tests to be  parameterized so that we could test both `partitioned` and `unpartitioned` cases ? 

##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -156,11 +170,175 @@ private CodecFactory codec() {
       Preconditions.checkNotNull(schema, "Schema is required");
       Preconditions.checkNotNull(name, "Table name is required and cannot be null");
 
+      Function<Schema, DatumWriter<?>> writerFunc;
+      if (createWriterFunc != null) {
+        writerFunc = createWriterFunc;
+      } else {
+        writerFunc = GenericAvroWriter::new;
+      }
+
       // add the Iceberg schema to keyValueMetadata
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
       return new AvroFileAppender<>(
-          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
+          AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
+    }
+  }
+
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+    private org.apache.iceberg.Schema rowSchema;
+    private PartitionSpec spec;
+    private StructLike partition;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      setAll(table.properties());
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.set(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.meta(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(Map<String, String> properties) {
+      appenderBuilder.meta(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(Function<Schema, DatumWriter<?>> writerFunction) {
+      this.createWriterFunc = writerFunction;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(org.apache.iceberg.Schema newRowSchema) {
+      this.rowSchema = newRowSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = fieldIds.stream().mapToInt(id -> id).toArray();
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
+
+      set("delete-type", "equality");
+      set("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      if (createWriterFunc != null) {
+        appenderBuilder.createWriterFunc(createWriterFunc);
+      }
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
+      Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+
+      set("delete-type", "position");
+
+      // the appender uses the row schema wrapped with position fields
+      appenderBuilder.schema(new org.apache.iceberg.Schema(
+          MetadataColumns.DELETE_FILE_PATH,
+          MetadataColumns.DELETE_FILE_POS,
+          NestedField.optional(
+              MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(),
+              MetadataColumns.DELETE_FILE_ROW_DOC)));
+
+      if (createWriterFunc != null) {
+        appenderBuilder.createWriterFunc(
+            avroSchema -> new PositionDatumWriter<>(createWriterFunc.apply(avroSchema)));
+      }
+
+      return new PositionDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, rowSchema.asStruct());
+    }
+  }
+
+  /**
+   * A {@link DatumWriter} implementation that wraps another to produce position delete rows.
+   *
+   * @param <D> the type of datum written as a deleted row
+   */
+  private static class PositionDatumWriter<D> implements DatumWriter<PositionDelete<D>> {
+    private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
+    private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();
+    private final DatumWriter<D> rowWriter;
+
+    private PositionDatumWriter(DatumWriter<D> rowWriter) {
+      this.rowWriter = rowWriter;
+    }
+
+    @Override
+    public void setSchema(Schema schema) {
+      rowWriter.setSchema(schema.getField("row").schema());

Review comment:
       The `row` field is an optional field,  it's possible that  `schema.getField("row")` would return a null.  We need to check nullability here I think. 

##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -82,11 +96,11 @@ public static WriteBuilder write(OutputFile file) {
 
   public static class WriteBuilder {
     private final OutputFile file;
+    private final Map<String, String> config = Maps.newHashMap();

Review comment:
       Q: I saw that the key-values from`config` are not actually used except the `key = write.avro.compression-codec`, is this correct ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468772506



##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -156,11 +170,175 @@ private CodecFactory codec() {
       Preconditions.checkNotNull(schema, "Schema is required");
       Preconditions.checkNotNull(name, "Table name is required and cannot be null");
 
+      Function<Schema, DatumWriter<?>> writerFunc;
+      if (createWriterFunc != null) {
+        writerFunc = createWriterFunc;
+      } else {
+        writerFunc = GenericAvroWriter::new;
+      }
+
       // add the Iceberg schema to keyValueMetadata
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
       return new AvroFileAppender<>(
-          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
+          AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
+    }
+  }
+
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+    private org.apache.iceberg.Schema rowSchema;
+    private PartitionSpec spec;
+    private StructLike partition;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      setAll(table.properties());
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.set(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.meta(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(Map<String, String> properties) {
+      appenderBuilder.meta(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(Function<Schema, DatumWriter<?>> writerFunction) {
+      this.createWriterFunc = writerFunction;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(org.apache.iceberg.Schema newRowSchema) {
+      this.rowSchema = newRowSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = fieldIds.stream().mapToInt(id -> id).toArray();
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
+
+      set("delete-type", "equality");
+      set("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      if (createWriterFunc != null) {
+        appenderBuilder.createWriterFunc(createWriterFunc);
+      }
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
+      Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+
+      set("delete-type", "position");
+
+      // the appender uses the row schema wrapped with position fields
+      appenderBuilder.schema(new org.apache.iceberg.Schema(
+          MetadataColumns.DELETE_FILE_PATH,
+          MetadataColumns.DELETE_FILE_POS,
+          NestedField.optional(
+              MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(),
+              MetadataColumns.DELETE_FILE_ROW_DOC)));

Review comment:
       That's correct, but anything that is shared between tests and production is not being tested for correctness. We create the expected schema by hand in tests because we want to validate that it is a 3-column schema with path, position, and row.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468444097



##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -31,11 +31,20 @@
   private MetadataColumns() {
   }
 
+  // IDs Integer.MAX_VALUE - (1-100) are used for metadata columns
   public static final NestedField FILE_PATH = NestedField.required(
       Integer.MAX_VALUE - 1, "_file", Types.StringType.get(), "Path of the file in which a row is stored");
   public static final NestedField ROW_POSITION = NestedField.required(
       Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
 
+  // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
+  public static final NestedField DELETE_FILE_PATH = NestedField.required(

Review comment:
       @JingsongLi , I got it, thanks for the explanation. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468405297



##########
File path: core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroDeleteWriters {
+  private static final Schema SCHEMA = new Schema(
+      NestedField.required(1, "id", Types.LongType.get()),
+      NestedField.optional(2, "data", Types.StringType.get()));
+
+  private List<Record> records;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void createDeleteRecords() {
+    GenericRecord record = GenericRecord.create(SCHEMA);
+
+    ImmutableList.Builder<Record> builder = ImmutableList.builder();
+    builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a")));
+    builder.add(record.copy(ImmutableMap.of("id", 2L, "data", "b")));
+    builder.add(record.copy(ImmutableMap.of("id", 3L, "data", "c")));
+    builder.add(record.copy(ImmutableMap.of("id", 4L, "data", "d")));
+    builder.add(record.copy(ImmutableMap.of("id", 5L, "data", "e")));
+
+    this.records = builder.build();
+  }
+
+  @Test
+  public void testEqualityDeleteWriter() throws IOException {
+    File deleteFile = temp.newFile();
+
+    OutputFile out = Files.localOutput(deleteFile);
+    EqualityDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
+        .createWriterFunc(DataWriter::create)
+        .overwrite()
+        .rowSchema(SCHEMA)
+        .withSpec(PartitionSpec.unpartitioned())
+        .equalityFieldIds(1)
+        .buildEqualityWriter();
+
+    try (EqualityDeleteWriter<Record> writer = deleteWriter) {
+      writer.deleteAll(records);
+    }
+
+    DeleteFile metadata = deleteWriter.toDeleteFile();
+    Assert.assertEquals("Format should be Avro", FileFormat.AVRO, metadata.format());
+    Assert.assertEquals("Should be equality deletes", FileContent.EQUALITY_DELETES, metadata.content());
+    Assert.assertEquals("Record count should be correct", records.size(), metadata.recordCount());
+    Assert.assertEquals("Partition should be empty", 0, metadata.partition().size());
+    Assert.assertNull("Key metadata should be null", metadata.keyMetadata());
+
+    List<Record> deletedRecords;
+    try (AvroIterable<Record> reader = Avro.read(out.toInputFile())
+        .project(SCHEMA)
+        .createReaderFunc(DataReader::create)
+        .build()) {
+      deletedRecords = Lists.newArrayList(reader);
+    }
+
+    Assert.assertEquals("Deleted records should match expected", records, deletedRecords);
+  }
+
+  @Test
+  public void testPositionDeleteWriter() throws IOException {
+    File deleteFile = temp.newFile();
+
+    Schema deleteSchema = new Schema(
+        MetadataColumns.DELETE_FILE_PATH,
+        MetadataColumns.DELETE_FILE_POS,
+        NestedField.optional(MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", SCHEMA.asStruct()));
+
+    String deletePath = "s3://bucket/path/file.parquet";
+    GenericRecord posDelete = GenericRecord.create(deleteSchema);
+    List<Record> expectedDeleteRecords = Lists.newArrayList();
+
+    OutputFile out = Files.localOutput(deleteFile);
+    PositionDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
+        .createWriterFunc(DataWriter::create)
+        .overwrite()
+        .rowSchema(SCHEMA)
+        .withSpec(PartitionSpec.unpartitioned())
+        .buildPositionWriter();
+
+    try (PositionDeleteWriter<Record> writer = deleteWriter) {
+      for (int i = 0; i < records.size(); i += 1) {
+        int pos = i * 3 + 2;
+        writer.delete(deletePath, pos, records.get(i));

Review comment:
       How about add an unit test to address the case that only write file/pos without `records.get(i)` for a delete marker ?   That's the positional row-delete case where we may don't want to reconstruct the streaming records. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468776053



##########
File path: core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroDeleteWriters {
+  private static final Schema SCHEMA = new Schema(
+      NestedField.required(1, "id", Types.LongType.get()),
+      NestedField.optional(2, "data", Types.StringType.get()));
+
+  private List<Record> records;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void createDeleteRecords() {
+    GenericRecord record = GenericRecord.create(SCHEMA);
+
+    ImmutableList.Builder<Record> builder = ImmutableList.builder();
+    builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a")));
+    builder.add(record.copy(ImmutableMap.of("id", 2L, "data", "b")));
+    builder.add(record.copy(ImmutableMap.of("id", 3L, "data", "c")));
+    builder.add(record.copy(ImmutableMap.of("id", 4L, "data", "d")));
+    builder.add(record.copy(ImmutableMap.of("id", 5L, "data", "e")));
+
+    this.records = builder.build();
+  }
+
+  @Test
+  public void testEqualityDeleteWriter() throws IOException {
+    File deleteFile = temp.newFile();
+
+    OutputFile out = Files.localOutput(deleteFile);
+    EqualityDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
+        .createWriterFunc(DataWriter::create)
+        .overwrite()
+        .rowSchema(SCHEMA)
+        .withSpec(PartitionSpec.unpartitioned())

Review comment:
       The partition is just passed through to the `DeleteFile`, so I'm not too concerned about testing that here. Other tests will depend on that being passed through correctly. The reason I used an unpartitioned spec in these tests is that this validates that when the partition is never passed, the `DeleteFile` still gets created successfully.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468771458



##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -82,11 +96,11 @@ public static WriteBuilder write(OutputFile file) {
 
   public static class WriteBuilder {
     private final OutputFile file;
+    private final Map<String, String> config = Maps.newHashMap();

Review comment:
       That's correct right now. We have been gradually expanding the settings that are supported in Parquet and I expect that to happen here, too. For example, when we add Zstd support for Avro, we will probably also want to support the compression level setting. Parquet also copies config into the Hadoop `Configuration`, which we could do for Avro if there is a use case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468871574



##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -31,11 +31,20 @@
   private MetadataColumns() {
   }
 
+  // IDs Integer.MAX_VALUE - (1-100) are used for metadata columns
   public static final NestedField FILE_PATH = NestedField.required(
       Integer.MAX_VALUE - 1, "_file", Types.StringType.get(), "Path of the file in which a row is stored");
   public static final NestedField ROW_POSITION = NestedField.required(
       Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
 
+  // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
+  public static final NestedField DELETE_FILE_PATH = NestedField.required(

Review comment:
       This is correct. We could read _file and _pos from a delete file that has the file_path and pos columns.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468397133



##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -156,11 +170,175 @@ private CodecFactory codec() {
       Preconditions.checkNotNull(schema, "Schema is required");
       Preconditions.checkNotNull(name, "Table name is required and cannot be null");
 
+      Function<Schema, DatumWriter<?>> writerFunc;
+      if (createWriterFunc != null) {
+        writerFunc = createWriterFunc;
+      } else {
+        writerFunc = GenericAvroWriter::new;
+      }
+
       // add the Iceberg schema to keyValueMetadata
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
       return new AvroFileAppender<>(
-          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
+          AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
+    }
+  }
+
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+    private org.apache.iceberg.Schema rowSchema;
+    private PartitionSpec spec;
+    private StructLike partition;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      setAll(table.properties());
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.set(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.meta(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(Map<String, String> properties) {
+      appenderBuilder.meta(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(Function<Schema, DatumWriter<?>> writerFunction) {
+      this.createWriterFunc = writerFunction;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(org.apache.iceberg.Schema newRowSchema) {
+      this.rowSchema = newRowSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = fieldIds.stream().mapToInt(id -> id).toArray();
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
+
+      set("delete-type", "equality");
+      set("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      if (createWriterFunc != null) {
+        appenderBuilder.createWriterFunc(createWriterFunc);
+      }
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
+      Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+
+      set("delete-type", "position");
+
+      // the appender uses the row schema wrapped with position fields
+      appenderBuilder.schema(new org.apache.iceberg.Schema(
+          MetadataColumns.DELETE_FILE_PATH,
+          MetadataColumns.DELETE_FILE_POS,
+          NestedField.optional(
+              MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(),
+              MetadataColumns.DELETE_FILE_ROW_DOC)));
+
+      if (createWriterFunc != null) {
+        appenderBuilder.createWriterFunc(
+            avroSchema -> new PositionDatumWriter<>(createWriterFunc.apply(avroSchema)));
+      }
+
+      return new PositionDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, rowSchema.asStruct());
+    }
+  }
+
+  /**
+   * A {@link DatumWriter} implementation that wraps another to produce position delete rows.
+   *
+   * @param <D> the type of datum written as a deleted row
+   */
+  private static class PositionDatumWriter<D> implements DatumWriter<PositionDelete<D>> {
+    private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
+    private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();
+    private final DatumWriter<D> rowWriter;
+
+    private PositionDatumWriter(DatumWriter<D> rowWriter) {
+      this.rowWriter = rowWriter;
+    }
+
+    @Override
+    public void setSchema(Schema schema) {
+      rowWriter.setSchema(schema.getField("row").schema());

Review comment:
       I misunderstood here,  although the row field is an optional field, `schema.getFiled("row")` won't be null and it will be the optional row field, please ignore this comment. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #1316: Add delete file writers for Avro

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468296622



##########
File path: core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+public class PositionDeleteWriter<T> implements Closeable {
+  private final FileAppender<StructLike> appender;
+  private final FileFormat format;
+  private final String location;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+  private final ByteBuffer keyMetadata;
+  private final PositionDelete<T> delete;
+  private DeleteFile deleteFile = null;
+
+  public PositionDeleteWriter(FileAppender<StructLike> appender, FileFormat format, String location,

Review comment:
       Good idea to have `FileAppender` as a parameter so that engines could build their own appenders! I was thinking about how to build a writer like this.

##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -31,11 +31,20 @@
   private MetadataColumns() {
   }
 
+  // IDs Integer.MAX_VALUE - (1-100) are used for metadata columns
   public static final NestedField FILE_PATH = NestedField.required(
       Integer.MAX_VALUE - 1, "_file", Types.StringType.get(), "Path of the file in which a row is stored");
   public static final NestedField ROW_POSITION = NestedField.required(
       Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
 
+  // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
+  public static final NestedField DELETE_FILE_PATH = NestedField.required(

Review comment:
       Why we need reserved columns upon metadata columns? The column docs are the same for both, will this confuse others if they want to use?

##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -31,11 +31,20 @@
   private MetadataColumns() {
   }
 
+  // IDs Integer.MAX_VALUE - (1-100) are used for metadata columns
   public static final NestedField FILE_PATH = NestedField.required(
       Integer.MAX_VALUE - 1, "_file", Types.StringType.get(), "Path of the file in which a row is stored");
   public static final NestedField ROW_POSITION = NestedField.required(
       Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
 
+  // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
+  public static final NestedField DELETE_FILE_PATH = NestedField.required(
+      Integer.MAX_VALUE - 101, "file_path", Types.StringType.get(), "Path of a file in which a deleted row is stored");
+  public static final NestedField DELETE_FILE_POS = NestedField.required(
+      Integer.MAX_VALUE - 102, "pos", Types.LongType.get(), "Ordinal position of a deleted row in the data file");
+  public static final int DELETE_FILE_ROW_FIELD_ID = Integer.MAX_VALUE - 103;

Review comment:
       Do we need to update spec if we want to store extra row value in position delete file?

##########
File path: core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.StructLike;
+
+@SuppressWarnings("checkstyle:OverloadMethodsDeclarationOrder")
+public class PositionDelete<R> implements StructLike {

Review comment:
       This should replace `PositionBasedDeleteRecord` in https://github.com/apache/iceberg/pull/971, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org