You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/08/11 07:31:21 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #1316: Add delete file writers for Avro

openinx commented on a change in pull request #1316:
URL: https://github.com/apache/iceberg/pull/1316#discussion_r468355158



##########
File path: core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.deletes;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Types;
+
+public class PositionDeleteWriter<T> implements Closeable {
+  private final FileAppender<StructLike> appender;
+  private final FileFormat format;
+  private final String location;
+  private final PartitionSpec spec;
+  private final StructLike partition;
+  private final ByteBuffer keyMetadata;
+  private final PositionDelete<T> delete;
+  private DeleteFile deleteFile = null;
+
+  public PositionDeleteWriter(FileAppender<StructLike> appender, FileFormat format, String location,
+                              PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata,
+                              Types.StructType rowType) {

Review comment:
       nit: The `rowType` is never used ?

##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -156,11 +170,175 @@ private CodecFactory codec() {
       Preconditions.checkNotNull(schema, "Schema is required");
       Preconditions.checkNotNull(name, "Table name is required and cannot be null");
 
+      Function<Schema, DatumWriter<?>> writerFunc;
+      if (createWriterFunc != null) {
+        writerFunc = createWriterFunc;
+      } else {
+        writerFunc = GenericAvroWriter::new;
+      }
+
       // add the Iceberg schema to keyValueMetadata
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
       return new AvroFileAppender<>(
-          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
+          AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
+    }
+  }
+
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+    private org.apache.iceberg.Schema rowSchema;
+    private PartitionSpec spec;
+    private StructLike partition;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      setAll(table.properties());

Review comment:
       Should we also add `withSpec(table.spec());`  here ? 

##########
File path: core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAvroDeleteWriters {
+  private static final Schema SCHEMA = new Schema(
+      NestedField.required(1, "id", Types.LongType.get()),
+      NestedField.optional(2, "data", Types.StringType.get()));
+
+  private List<Record> records;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void createDeleteRecords() {
+    GenericRecord record = GenericRecord.create(SCHEMA);
+
+    ImmutableList.Builder<Record> builder = ImmutableList.builder();
+    builder.add(record.copy(ImmutableMap.of("id", 1L, "data", "a")));
+    builder.add(record.copy(ImmutableMap.of("id", 2L, "data", "b")));
+    builder.add(record.copy(ImmutableMap.of("id", 3L, "data", "c")));
+    builder.add(record.copy(ImmutableMap.of("id", 4L, "data", "d")));
+    builder.add(record.copy(ImmutableMap.of("id", 5L, "data", "e")));
+
+    this.records = builder.build();
+  }
+
+  @Test
+  public void testEqualityDeleteWriter() throws IOException {
+    File deleteFile = temp.newFile();
+
+    OutputFile out = Files.localOutput(deleteFile);
+    EqualityDeleteWriter<Record> deleteWriter = Avro.writeDeletes(out)
+        .createWriterFunc(DataWriter::create)
+        .overwrite()
+        .rowSchema(SCHEMA)
+        .withSpec(PartitionSpec.unpartitioned())

Review comment:
       Q: should we make this unit tests to be  parameterized so that we could test both `partitioned` and `unpartitioned` cases ? 

##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -156,11 +170,175 @@ private CodecFactory codec() {
       Preconditions.checkNotNull(schema, "Schema is required");
       Preconditions.checkNotNull(name, "Table name is required and cannot be null");
 
+      Function<Schema, DatumWriter<?>> writerFunc;
+      if (createWriterFunc != null) {
+        writerFunc = createWriterFunc;
+      } else {
+        writerFunc = GenericAvroWriter::new;
+      }
+
       // add the Iceberg schema to keyValueMetadata
       meta("iceberg.schema", SchemaParser.toJson(schema));
 
       return new AvroFileAppender<>(
-          AvroSchemaUtil.convert(schema, name), file, createWriterFunc, codec(), metadata, overwrite);
+          AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
+    }
+  }
+
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private Function<Schema, DatumWriter<?>> createWriterFunc = null;
+    private org.apache.iceberg.Schema rowSchema;
+    private PartitionSpec spec;
+    private StructLike partition;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      setAll(table.properties());
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.set(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.meta(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(Map<String, String> properties) {
+      appenderBuilder.meta(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(Function<Schema, DatumWriter<?>> writerFunction) {
+      this.createWriterFunc = writerFunction;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(org.apache.iceberg.Schema newRowSchema) {
+      this.rowSchema = newRowSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = fieldIds.stream().mapToInt(id -> id).toArray();
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
+      Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
+
+      set("delete-type", "equality");
+      set("delete-field-ids", IntStream.of(equalityFieldIds)
+          .mapToObj(Objects::toString)
+          .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      if (createWriterFunc != null) {
+        appenderBuilder.createWriterFunc(createWriterFunc);
+      }
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
+      Preconditions.checkState(equalityFieldIds == null, "Cannot create position delete file using delete field ids");
+
+      set("delete-type", "position");
+
+      // the appender uses the row schema wrapped with position fields
+      appenderBuilder.schema(new org.apache.iceberg.Schema(
+          MetadataColumns.DELETE_FILE_PATH,
+          MetadataColumns.DELETE_FILE_POS,
+          NestedField.optional(
+              MetadataColumns.DELETE_FILE_ROW_FIELD_ID, "row", rowSchema.asStruct(),
+              MetadataColumns.DELETE_FILE_ROW_DOC)));
+
+      if (createWriterFunc != null) {
+        appenderBuilder.createWriterFunc(
+            avroSchema -> new PositionDatumWriter<>(createWriterFunc.apply(avroSchema)));
+      }
+
+      return new PositionDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, rowSchema.asStruct());
+    }
+  }
+
+  /**
+   * A {@link DatumWriter} implementation that wraps another to produce position delete rows.
+   *
+   * @param <D> the type of datum written as a deleted row
+   */
+  private static class PositionDatumWriter<D> implements DatumWriter<PositionDelete<D>> {
+    private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
+    private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();
+    private final DatumWriter<D> rowWriter;
+
+    private PositionDatumWriter(DatumWriter<D> rowWriter) {
+      this.rowWriter = rowWriter;
+    }
+
+    @Override
+    public void setSchema(Schema schema) {
+      rowWriter.setSchema(schema.getField("row").schema());

Review comment:
       The `row` field is an optional field,  it's possible that  `schema.getField("row")` would return a null.  We need to check nullability here I think. 

##########
File path: core/src/main/java/org/apache/iceberg/avro/Avro.java
##########
@@ -82,11 +96,11 @@ public static WriteBuilder write(OutputFile file) {
 
   public static class WriteBuilder {
     private final OutputFile file;
+    private final Map<String, String> config = Maps.newHashMap();

Review comment:
       Q: I saw that the key-values from`config` are not actually used except the `key = write.avro.compression-codec`, is this correct ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org