You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/08 10:08:36 UTC

[GitHub] [iceberg] openinx opened a new pull request #1888: Core: Add BaseDeltaWriter

openinx opened a new pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888


   This is a separate PR to add the BaseDeltaWriter in BaseTaskWriter (https://github.com/apache/iceberg/pull/1818/files#diff-fc9a9fd84d24c607fd85e053b08a559f56dd2dd2a46f1341c528e7a0269f873cR92).   The DeltaWriter could accept both `insert` and equality `delete`s. 
   
   For the CDC case and upsert case,  compute engine such as flink and spark could write the streaming records (INSERT or DELETE) by this delta writer to apache iceberg table.  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r539753986



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private StructLikeMap<PathOffset> insertedRowMap;
+
+    public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
+      Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(eqDeleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    protected abstract StructLike asCopiedKey(T row);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = asCopiedKey(row);
+      // Adding a pos-delete to replace the old filePos.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
+      if (previous != null) {
+        // TODO attach the previous row if has a positional-delete row schema in appender factory.
+        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
+      }
+
+      dataWriter.write(row);
+    }
+
+    /**
+     * Delete the rows with the given key.
+     *
+     * @param key is the projected values which could be write to eq-delete file directly.
+     */
+    public void delete(T key) throws IOException {

Review comment:
       Yes, I think those two cases are correct: the caller may want to pass the key to delete, or an entire row to delete.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r539052783



##########
File path: data/src/test/java/org/apache/iceberg/io/TestTaskDeltaWriter.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTaskDeltaWriter extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024L;
+
+  private final FileFormat format;
+  private final GenericRecord gRecord = GenericRecord.create(SCHEMA);
+  private final GenericRecord posRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
+
+  private OutputFileFactory fileFactory = null;
+  private int idFieldId;
+  private int dataFieldId;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {"avro"},
+        {"parquet"}
+    };
+  }
+
+  public TestTaskDeltaWriter(String fileFormat) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH));
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created by table create
+
+    this.metadataDir = new File(tableDir, "metadata");
+
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(),
+        table.encryption(), 1, 1);
+
+    this.idFieldId = table.schema().findField("id").fieldId();
+    this.dataFieldId = table.schema().findField("data").fieldId();
+
+    table.updateProperties()
+        .defaultFormat(format)
+        .commit();
+  }
+
+  private Record createRecord(Integer id, String data) {
+    return gRecord.copy("id", id, "data", data);
+  }
+
+  @Test
+  public void testPureInsert() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(idFieldId, dataFieldId);
+    Schema eqDeleteSchema = table.schema();
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 20; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+
+    WriteResult result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(result);
+    Assert.assertEquals("Should have expected records", expectedRowSet(expected), actualRowSet("*"));
+
+    deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    for (int i = 20; i < 30; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+    result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(deltaWriter.complete());

Review comment:
       Yeah,  nice finding !




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538808121



##########
File path: data/src/test/java/org/apache/iceberg/io/TestTaskDeltaWriter.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTaskDeltaWriter extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024L;
+
+  private final FileFormat format;
+  private final GenericRecord gRecord = GenericRecord.create(SCHEMA);
+  private final GenericRecord posRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
+
+  private OutputFileFactory fileFactory = null;
+  private int idFieldId;
+  private int dataFieldId;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {"avro"},
+        {"parquet"}
+    };
+  }
+
+  public TestTaskDeltaWriter(String fileFormat) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH));
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created by table create
+
+    this.metadataDir = new File(tableDir, "metadata");
+
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(),
+        table.encryption(), 1, 1);
+
+    this.idFieldId = table.schema().findField("id").fieldId();
+    this.dataFieldId = table.schema().findField("data").fieldId();
+
+    table.updateProperties()
+        .defaultFormat(format)
+        .commit();
+  }
+
+  private Record createRecord(Integer id, String data) {
+    return gRecord.copy("id", id, "data", data);
+  }
+
+  @Test
+  public void testPureInsert() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(idFieldId, dataFieldId);
+    Schema eqDeleteSchema = table.schema();
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 20; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+
+    WriteResult result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(result);
+    Assert.assertEquals("Should have expected records", expectedRowSet(expected), actualRowSet("*"));
+
+    deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    for (int i = 20; i < 30; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+    result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(deltaWriter.complete());
+    Assert.assertEquals("Should have expected records", expectedRowSet(expected), actualRowSet("*"));
+  }
+
+  @Test
+  public void testInsertDuplicatedKey() throws IOException {
+    List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId);
+    Schema eqDeleteSchema = table.schema().select("id");
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(equalityFieldIds, eqDeleteSchema);
+    deltaWriter.write(createRecord(1, "aaa"));
+    deltaWriter.write(createRecord(2, "bbb"));
+    deltaWriter.write(createRecord(3, "ccc"));
+    deltaWriter.write(createRecord(4, "ddd"));
+    deltaWriter.write(createRecord(4, "eee"));
+    deltaWriter.write(createRecord(3, "fff"));
+    deltaWriter.write(createRecord(2, "ggg"));
+    deltaWriter.write(createRecord(1, "hhh"));
+
+    WriteResult result = deltaWriter.complete();
+    commitTransaction(result);
+
+    Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have a pos-delete file", 1, result.deleteFiles().length);
+    DeleteFile posDeleteFile = result.deleteFiles()[0];
+    Assert.assertEquals("Should be a pos-delete file", FileContent.POSITION_DELETES, posDeleteFile.content());
+    Assert.assertEquals("Should have expected records", expectedRowSet(ImmutableList.of(
+        createRecord(4, "eee"),
+        createRecord(3, "fff"),
+        createRecord(2, "ggg"),
+        createRecord(1, "hhh")
+    )), actualRowSet("*"));
+
+    // Check records in the data file.
+    DataFile dataFile = result.dataFiles()[0];
+    Assert.assertEquals(ImmutableList.of(
+        createRecord(1, "aaa"),
+        createRecord(2, "bbb"),
+        createRecord(3, "ccc"),
+        createRecord(4, "ddd"),
+        createRecord(4, "eee"),
+        createRecord(3, "fff"),
+        createRecord(2, "ggg"),
+        createRecord(1, "hhh")
+    ), readRecordsAsList(table.schema(), dataFile.path()));
+
+    // Check records in the pos-delete file.
+    Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
+    Assert.assertEquals(ImmutableList.of(
+        posRecord.copy("file_path", dataFile.path(), "pos", 0L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 1L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 2L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 3L)
+    ), readRecordsAsList(posDeleteSchema, posDeleteFile.path()));
+  }
+
+  @Test
+  public void testUpsertSameRow() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(idFieldId, dataFieldId);
+    Schema eqDeleteSchema = table.schema();
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+
+    Record record = createRecord(1, "aaa");
+    deltaWriter.write(record);
+    deltaWriter.delete(record);
+    deltaWriter.write(record);
+    deltaWriter.delete(record);
+    deltaWriter.write(record);
+
+    WriteResult result = deltaWriter.complete();
+    Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have a pos-delete file and an eq-delete file", 2, result.deleteFiles().length);
+    commitTransaction(result);
+    Assert.assertEquals("Should have an expected record", expectedRowSet(ImmutableList.of(record)), actualRowSet("*"));
+
+    // Check records in the data file.
+    DataFile dataFile = result.dataFiles()[0];
+    Assert.assertEquals(ImmutableList.of(record, record, record), readRecordsAsList(table.schema(), dataFile.path()));
+
+    // Check records in the eq-delete file.
+    DeleteFile eqDeleteFile = result.deleteFiles()[0];
+    Assert.assertEquals(ImmutableList.of(record, record), readRecordsAsList(eqDeleteSchema, eqDeleteFile.path()));

Review comment:
       Is it necessary to encode the record twice? What about detecting already deleted keys and omitting the second delete? That might be over-complicating this for a very small optimization though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r540596966



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +81,135 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base equality delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseEqualityDeltaWriter implements Closeable {
+    private final StructProjection structProjection;
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private Map<StructLike, PathOffset> insertedRowMap;
+
+    public BaseEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema deleteSchema) {
+      Preconditions.checkNotNull(schema, "Iceberg table schema cannot be null.");
+      Preconditions.checkNotNull(deleteSchema, "Equality-delete schema cannot be null.");
+      this.structProjection = StructProjection.create(schema, deleteSchema);
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = structProjection.copy().wrap(asStructLike(row));
+      // Adding a pos-delete to replace the old path-offset.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);

Review comment:
       Why did you decide to delegate to the implementation instead of creating a copy here? I think it would be fairly easy to copy with a StructLike class:
   
   ```java
     public static class StructCopy implements StructLike {
       public static StructLike copy(StructLike struct) {
         return new StructCopy(struct);
       }
   
       private final Object[] values;
   
       private StructCopy(StructLike toCopy) {
         this.values = new Object[toCopy.size()];
         for (int i = 0; i < values.length; i += 1) {
           Object value = toCopy.get(i, Object.class);
           if (value instanceof StructLike) {
             values[i] = copy((StructLike) value);
           } else {
             values[i] = value;
           }
         }
       }
   
       @Override
       public int size() {
         return values.length;
       }
   
       @Override
       public <T> T get(int pos, Class<T> javaClass) {
         return javaClass.cast(values[pos]);
       }
   
       @Override
       public <T> void set(int pos, T value) {
         throw new UnsupportedOperationException("Struct copy cannot be modified");
       }
     }
   ```
   
   That doesn't handle lists or maps, but the projection doesn't handle lists or maps either, so it would be okay to use that in the StructLikeMap for all of the cases we currently support.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r539059145



##########
File path: data/src/test/java/org/apache/iceberg/io/TestTaskDeltaWriter.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTaskDeltaWriter extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024L;
+
+  private final FileFormat format;
+  private final GenericRecord gRecord = GenericRecord.create(SCHEMA);
+  private final GenericRecord posRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
+
+  private OutputFileFactory fileFactory = null;
+  private int idFieldId;
+  private int dataFieldId;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {"avro"},
+        {"parquet"}
+    };
+  }
+
+  public TestTaskDeltaWriter(String fileFormat) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH));
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created by table create
+
+    this.metadataDir = new File(tableDir, "metadata");
+
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(),
+        table.encryption(), 1, 1);
+
+    this.idFieldId = table.schema().findField("id").fieldId();
+    this.dataFieldId = table.schema().findField("data").fieldId();
+
+    table.updateProperties()
+        .defaultFormat(format)
+        .commit();
+  }
+
+  private Record createRecord(Integer id, String data) {
+    return gRecord.copy("id", id, "data", data);
+  }
+
+  @Test
+  public void testPureInsert() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(idFieldId, dataFieldId);
+    Schema eqDeleteSchema = table.schema();
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 20; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+
+    WriteResult result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(result);
+    Assert.assertEquals("Should have expected records", expectedRowSet(expected), actualRowSet("*"));
+
+    deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    for (int i = 20; i < 30; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+    result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(deltaWriter.complete());
+    Assert.assertEquals("Should have expected records", expectedRowSet(expected), actualRowSet("*"));
+  }
+
+  @Test
+  public void testInsertDuplicatedKey() throws IOException {
+    List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId);
+    Schema eqDeleteSchema = table.schema().select("id");
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(equalityFieldIds, eqDeleteSchema);
+    deltaWriter.write(createRecord(1, "aaa"));
+    deltaWriter.write(createRecord(2, "bbb"));
+    deltaWriter.write(createRecord(3, "ccc"));
+    deltaWriter.write(createRecord(4, "ddd"));
+    deltaWriter.write(createRecord(4, "eee"));
+    deltaWriter.write(createRecord(3, "fff"));
+    deltaWriter.write(createRecord(2, "ggg"));
+    deltaWriter.write(createRecord(1, "hhh"));
+
+    WriteResult result = deltaWriter.complete();
+    commitTransaction(result);
+
+    Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have a pos-delete file", 1, result.deleteFiles().length);
+    DeleteFile posDeleteFile = result.deleteFiles()[0];
+    Assert.assertEquals("Should be a pos-delete file", FileContent.POSITION_DELETES, posDeleteFile.content());
+    Assert.assertEquals("Should have expected records", expectedRowSet(ImmutableList.of(
+        createRecord(4, "eee"),
+        createRecord(3, "fff"),
+        createRecord(2, "ggg"),
+        createRecord(1, "hhh")
+    )), actualRowSet("*"));
+
+    // Check records in the data file.
+    DataFile dataFile = result.dataFiles()[0];
+    Assert.assertEquals(ImmutableList.of(
+        createRecord(1, "aaa"),
+        createRecord(2, "bbb"),
+        createRecord(3, "ccc"),
+        createRecord(4, "ddd"),
+        createRecord(4, "eee"),
+        createRecord(3, "fff"),
+        createRecord(2, "ggg"),
+        createRecord(1, "hhh")
+    ), readRecordsAsList(table.schema(), dataFile.path()));
+
+    // Check records in the pos-delete file.
+    Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
+    Assert.assertEquals(ImmutableList.of(
+        posRecord.copy("file_path", dataFile.path(), "pos", 0L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 1L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 2L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 3L)
+    ), readRecordsAsList(posDeleteSchema, posDeleteFile.path()));
+  }
+
+  @Test
+  public void testUpsertSameRow() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(idFieldId, dataFieldId);
+    Schema eqDeleteSchema = table.schema();
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+
+    Record record = createRecord(1, "aaa");
+    deltaWriter.write(record);
+    deltaWriter.delete(record);
+    deltaWriter.write(record);
+    deltaWriter.delete(record);
+    deltaWriter.write(record);
+
+    WriteResult result = deltaWriter.complete();
+    Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have a pos-delete file and an eq-delete file", 2, result.deleteFiles().length);
+    commitTransaction(result);
+    Assert.assertEquals("Should have an expected record", expectedRowSet(ImmutableList.of(record)), actualRowSet("*"));
+
+    // Check records in the data file.
+    DataFile dataFile = result.dataFiles()[0];
+    Assert.assertEquals(ImmutableList.of(record, record, record), readRecordsAsList(table.schema(), dataFile.path()));
+
+    // Check records in the eq-delete file.
+    DeleteFile eqDeleteFile = result.deleteFiles()[0];
+    Assert.assertEquals(ImmutableList.of(record, record), readRecordsAsList(eqDeleteSchema, eqDeleteFile.path()));

Review comment:
       Em,  agreed that we don't have to upsert the same key twice.  The first upsert is enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538796532



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {

Review comment:
       How about `BaseEqualityDeltaWriter`? I think Spark `MERGE INTO` will likely use a delta writer that doesn't create the equality writer or use the `SortedPosDeleteWriter` because it will request that the rows are already ordered.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538802022



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private StructLikeMap<PathOffset> insertedRowMap;
+
+    public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
+      Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(eqDeleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    protected abstract StructLike asCopiedKey(T row);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = asCopiedKey(row);
+      // Adding a pos-delete to replace the old filePos.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
+      if (previous != null) {
+        // TODO attach the previous row if has a positional-delete row schema in appender factory.
+        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
+      }
+
+      dataWriter.write(row);
+    }
+
+    /**
+     * Delete the rows with the given key.
+     *
+     * @param key is the projected values which could be write to eq-delete file directly.
+     */
+    public void delete(T key) throws IOException {

Review comment:
       Why pass a key here instead of a row?
   
   It seems to be that it would be easier to assume that this is a row, so that the `write` and `delete` methods accept the same data.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r539047703



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private StructLikeMap<PathOffset> insertedRowMap;
+
+    public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
+      Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(eqDeleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    protected abstract StructLike asCopiedKey(T row);

Review comment:
       I think the above implementation of `asCopiedKey` may has problems, because in some compute engine --- for example flink,  it will use the singleton `RowDataWrapper` instance to implement the `asStructLike`,  then for both the old key and copied key, they are sharing the same `RowDataWrapper` which has wrapped the new `RowData`.   That is messing up the keys in `insertedRowMap`.
   
   Considered this issue,  I finally decided to abstract a separate `asCopiedKey` to clone a totally different key which won't share object values with the old one.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r539084806



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private StructLikeMap<PathOffset> insertedRowMap;
+
+    public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
+      Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(eqDeleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    protected abstract StructLike asCopiedKey(T row);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = asCopiedKey(row);
+      // Adding a pos-delete to replace the old filePos.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
+      if (previous != null) {
+        // TODO attach the previous row if has a positional-delete row schema in appender factory.
+        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
+      }
+
+      dataWriter.write(row);
+    }
+
+    /**
+     * Delete the rows with the given key.
+     *
+     * @param key is the projected values which could be write to eq-delete file directly.
+     */
+    public void delete(T key) throws IOException {

Review comment:
       Thought it again,  if the table has the `(a,b,c,d)` columns, and the equality fields are `(b,c)`,  then I think it's enough to provide two ways to write the equality-delete file: 
   
   1.   Only write the `(b,c)`  column values into equality-delete file.   That is enough to maintain the correct deletion semantics if people don't expect to do any real incremental pulling. 
   
   2. Write the `(a,b,c,d)` column values into equality-delete file.  That's suitable if people want to consume the incremental inserts and deletes for future streaming analysis or data pipeline  etc. 
   
   Except the above cases,  I can't think of other scenarios that require a custom equality schema.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538808931



##########
File path: data/src/test/java/org/apache/iceberg/io/TestTaskDeltaWriter.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTaskDeltaWriter extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024L;
+
+  private final FileFormat format;
+  private final GenericRecord gRecord = GenericRecord.create(SCHEMA);
+  private final GenericRecord posRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
+
+  private OutputFileFactory fileFactory = null;
+  private int idFieldId;
+  private int dataFieldId;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {"avro"},
+        {"parquet"}
+    };
+  }
+
+  public TestTaskDeltaWriter(String fileFormat) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH));
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created by table create
+
+    this.metadataDir = new File(tableDir, "metadata");
+
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(),
+        table.encryption(), 1, 1);
+
+    this.idFieldId = table.schema().findField("id").fieldId();
+    this.dataFieldId = table.schema().findField("data").fieldId();
+
+    table.updateProperties()
+        .defaultFormat(format)
+        .commit();
+  }
+
+  private Record createRecord(Integer id, String data) {
+    return gRecord.copy("id", id, "data", data);
+  }
+
+  @Test
+  public void testPureInsert() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(idFieldId, dataFieldId);
+    Schema eqDeleteSchema = table.schema();
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 20; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+
+    WriteResult result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(result);
+    Assert.assertEquals("Should have expected records", expectedRowSet(expected), actualRowSet("*"));
+
+    deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    for (int i = 20; i < 30; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+    result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(deltaWriter.complete());
+    Assert.assertEquals("Should have expected records", expectedRowSet(expected), actualRowSet("*"));
+  }
+
+  @Test
+  public void testInsertDuplicatedKey() throws IOException {
+    List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId);
+    Schema eqDeleteSchema = table.schema().select("id");
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(equalityFieldIds, eqDeleteSchema);
+    deltaWriter.write(createRecord(1, "aaa"));
+    deltaWriter.write(createRecord(2, "bbb"));
+    deltaWriter.write(createRecord(3, "ccc"));
+    deltaWriter.write(createRecord(4, "ddd"));
+    deltaWriter.write(createRecord(4, "eee"));
+    deltaWriter.write(createRecord(3, "fff"));
+    deltaWriter.write(createRecord(2, "ggg"));
+    deltaWriter.write(createRecord(1, "hhh"));
+
+    WriteResult result = deltaWriter.complete();
+    commitTransaction(result);
+
+    Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have a pos-delete file", 1, result.deleteFiles().length);
+    DeleteFile posDeleteFile = result.deleteFiles()[0];
+    Assert.assertEquals("Should be a pos-delete file", FileContent.POSITION_DELETES, posDeleteFile.content());
+    Assert.assertEquals("Should have expected records", expectedRowSet(ImmutableList.of(
+        createRecord(4, "eee"),
+        createRecord(3, "fff"),
+        createRecord(2, "ggg"),
+        createRecord(1, "hhh")
+    )), actualRowSet("*"));
+
+    // Check records in the data file.
+    DataFile dataFile = result.dataFiles()[0];
+    Assert.assertEquals(ImmutableList.of(
+        createRecord(1, "aaa"),
+        createRecord(2, "bbb"),
+        createRecord(3, "ccc"),
+        createRecord(4, "ddd"),
+        createRecord(4, "eee"),
+        createRecord(3, "fff"),
+        createRecord(2, "ggg"),
+        createRecord(1, "hhh")
+    ), readRecordsAsList(table.schema(), dataFile.path()));
+
+    // Check records in the pos-delete file.
+    Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
+    Assert.assertEquals(ImmutableList.of(
+        posRecord.copy("file_path", dataFile.path(), "pos", 0L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 1L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 2L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 3L)
+    ), readRecordsAsList(posDeleteSchema, posDeleteFile.path()));
+  }
+
+  @Test
+  public void testUpsertSameRow() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(idFieldId, dataFieldId);
+    Schema eqDeleteSchema = table.schema();
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+
+    Record record = createRecord(1, "aaa");
+    deltaWriter.write(record);
+    deltaWriter.delete(record);
+    deltaWriter.write(record);
+    deltaWriter.delete(record);
+    deltaWriter.write(record);
+
+    WriteResult result = deltaWriter.complete();
+    Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have a pos-delete file and an eq-delete file", 2, result.deleteFiles().length);
+    commitTransaction(result);
+    Assert.assertEquals("Should have an expected record", expectedRowSet(ImmutableList.of(record)), actualRowSet("*"));
+
+    // Check records in the data file.
+    DataFile dataFile = result.dataFiles()[0];
+    Assert.assertEquals(ImmutableList.of(record, record, record), readRecordsAsList(table.schema(), dataFile.path()));
+
+    // Check records in the eq-delete file.
+    DeleteFile eqDeleteFile = result.deleteFiles()[0];
+    Assert.assertEquals(ImmutableList.of(record, record), readRecordsAsList(eqDeleteSchema, eqDeleteFile.path()));
+
+    // Check records in the pos-delete file.
+    DeleteFile posDeleteFile = result.deleteFiles()[1];
+    Assert.assertEquals(ImmutableList.of(
+        posRecord.copy("file_path", dataFile.path(), "pos", 0L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 1L)
+    ), readRecordsAsList(DeleteSchemaUtil.pathPosSchema(), posDeleteFile.path()));
+
+    deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    deltaWriter.delete(record);
+    result = deltaWriter.complete();
+    Assert.assertEquals("Should have 0 data file.", 0, result.dataFiles().length);
+    Assert.assertEquals("Should have 1 eq-delete file", 1, result.deleteFiles().length);
+    commitTransaction(result);
+    Assert.assertEquals("Should have no record", expectedRowSet(ImmutableList.of()), actualRowSet("*"));
+  }
+
+  @Test
+  public void testUpsertByDataField() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(dataFieldId);
+    Schema eqDeleteSchema = table.schema().select("data");
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    deltaWriter.write(createRecord(1, "aaa"));
+    deltaWriter.write(createRecord(2, "bbb"));
+    deltaWriter.write(createRecord(3, "aaa"));
+    deltaWriter.write(createRecord(3, "ccc"));
+    deltaWriter.write(createRecord(4, "ccc"));
+
+    // Commit the 1th transaction.
+    WriteResult result = deltaWriter.complete();
+    Assert.assertEquals("Should have a data file", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have a pos-delete file for deduplication purpose", 1, result.deleteFiles().length);
+    Assert.assertEquals("Should be pos-delete file", FileContent.POSITION_DELETES, result.deleteFiles()[0].content());
+    commitTransaction(result);
+
+    Assert.assertEquals("Should have expected records", expectedRowSet(ImmutableList.of(
+        createRecord(2, "bbb"),
+        createRecord(3, "aaa"),
+        createRecord(4, "ccc")
+    )), actualRowSet("*"));
+
+    // Start the 2th transaction.

Review comment:
       Nit: it would normally be "2nd"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#issuecomment-743340374


   Merged! Great work, @openinx!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538814913



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private StructLikeMap<PathOffset> insertedRowMap;
+
+    public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
+      Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(eqDeleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    protected abstract StructLike asCopiedKey(T row);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = asCopiedKey(row);
+      // Adding a pos-delete to replace the old filePos.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
+      if (previous != null) {
+        // TODO attach the previous row if has a positional-delete row schema in appender factory.
+        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
+      }
+
+      dataWriter.write(row);
+    }
+
+    /**
+     * Delete the rows with the given key.
+     *
+     * @param key is the projected values which could be write to eq-delete file directly.
+     */
+    public void delete(T key) throws IOException {

Review comment:
       From the tests, I see that this can be used when the deleted row won't be passed. But I don't think this leaves an option for writing an entire row to the delete file and using a subset of its columns for the delete. This assumes that whatever struct is passed here is the entire delete key.
   
   To support both use cases (writing an already projected key and writing a full row and projecting), I think we should have two methods: `deleteKey` and `delete`. That way both are supported. We would also need a test for passing the full row to the delete file, but deleting by key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538807164



##########
File path: data/src/test/java/org/apache/iceberg/io/TestTaskDeltaWriter.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTaskDeltaWriter extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024L;
+
+  private final FileFormat format;
+  private final GenericRecord gRecord = GenericRecord.create(SCHEMA);
+  private final GenericRecord posRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
+
+  private OutputFileFactory fileFactory = null;
+  private int idFieldId;
+  private int dataFieldId;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {"avro"},
+        {"parquet"}
+    };
+  }
+
+  public TestTaskDeltaWriter(String fileFormat) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH));
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created by table create
+
+    this.metadataDir = new File(tableDir, "metadata");
+
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(),
+        table.encryption(), 1, 1);
+
+    this.idFieldId = table.schema().findField("id").fieldId();
+    this.dataFieldId = table.schema().findField("data").fieldId();
+
+    table.updateProperties()
+        .defaultFormat(format)
+        .commit();
+  }
+
+  private Record createRecord(Integer id, String data) {
+    return gRecord.copy("id", id, "data", data);
+  }
+
+  @Test
+  public void testPureInsert() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(idFieldId, dataFieldId);
+    Schema eqDeleteSchema = table.schema();
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 20; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+
+    WriteResult result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(result);
+    Assert.assertEquals("Should have expected records", expectedRowSet(expected), actualRowSet("*"));
+
+    deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    for (int i = 20; i < 30; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+    result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(deltaWriter.complete());
+    Assert.assertEquals("Should have expected records", expectedRowSet(expected), actualRowSet("*"));
+  }
+
+  @Test
+  public void testInsertDuplicatedKey() throws IOException {
+    List<Integer> equalityFieldIds = Lists.newArrayList(idFieldId);
+    Schema eqDeleteSchema = table.schema().select("id");
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(equalityFieldIds, eqDeleteSchema);
+    deltaWriter.write(createRecord(1, "aaa"));
+    deltaWriter.write(createRecord(2, "bbb"));
+    deltaWriter.write(createRecord(3, "ccc"));
+    deltaWriter.write(createRecord(4, "ddd"));
+    deltaWriter.write(createRecord(4, "eee"));
+    deltaWriter.write(createRecord(3, "fff"));
+    deltaWriter.write(createRecord(2, "ggg"));
+    deltaWriter.write(createRecord(1, "hhh"));
+
+    WriteResult result = deltaWriter.complete();
+    commitTransaction(result);
+
+    Assert.assertEquals("Should have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have a pos-delete file", 1, result.deleteFiles().length);
+    DeleteFile posDeleteFile = result.deleteFiles()[0];
+    Assert.assertEquals("Should be a pos-delete file", FileContent.POSITION_DELETES, posDeleteFile.content());
+    Assert.assertEquals("Should have expected records", expectedRowSet(ImmutableList.of(
+        createRecord(4, "eee"),
+        createRecord(3, "fff"),
+        createRecord(2, "ggg"),
+        createRecord(1, "hhh")
+    )), actualRowSet("*"));
+
+    // Check records in the data file.
+    DataFile dataFile = result.dataFiles()[0];
+    Assert.assertEquals(ImmutableList.of(
+        createRecord(1, "aaa"),
+        createRecord(2, "bbb"),
+        createRecord(3, "ccc"),
+        createRecord(4, "ddd"),
+        createRecord(4, "eee"),
+        createRecord(3, "fff"),
+        createRecord(2, "ggg"),
+        createRecord(1, "hhh")
+    ), readRecordsAsList(table.schema(), dataFile.path()));
+
+    // Check records in the pos-delete file.
+    Schema posDeleteSchema = DeleteSchemaUtil.pathPosSchema();
+    Assert.assertEquals(ImmutableList.of(
+        posRecord.copy("file_path", dataFile.path(), "pos", 0L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 1L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 2L),
+        posRecord.copy("file_path", dataFile.path(), "pos", 3L)
+    ), readRecordsAsList(posDeleteSchema, posDeleteFile.path()));
+  }
+
+  @Test
+  public void testUpsertSameRow() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(idFieldId, dataFieldId);
+    Schema eqDeleteSchema = table.schema();
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+
+    Record record = createRecord(1, "aaa");
+    deltaWriter.write(record);
+    deltaWriter.delete(record);

Review comment:
       This passes the entire row as record, but the delta writer assumes that it is only the key and doesn't call `asCopiedKey`. This is the kind of problem I think we should prevent by passing full records into this `delete` method and always projecting to get the key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r539515554



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private StructLikeMap<PathOffset> insertedRowMap;
+
+    public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
+      Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(eqDeleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    protected abstract StructLike asCopiedKey(T row);

Review comment:
       Good catch. I like the idea of copying the data into a new struct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r539639585



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +81,135 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base equality delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseEqualityDeltaWriter implements Closeable {
+    private final StructProjection structProjection;
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private Map<StructLike, PathOffset> insertedRowMap;
+
+    public BaseEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema deleteSchema) {
+      Preconditions.checkNotNull(schema, "Iceberg table schema cannot be null.");
+      Preconditions.checkNotNull(deleteSchema, "Equality-delete schema cannot be null.");
+      this.structProjection = StructProjection.create(schema, deleteSchema);
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = structProjection.copy().wrap(asStructLike(row));
+      // Adding a pos-delete to replace the old path-offset.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);

Review comment:
       Doesn't this have the problem you were talking about with using only wrappers? If `asStructLike` uses a wrapper that is reused, then even if the key is a new instance, the underlying data in the `StructLike` wrapper returned will change on the next call to write.
   
   I think instead of copying the projection, this needs to copy the result of the projection into a new struct and add that to the map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r540713704



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +81,135 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base equality delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseEqualityDeltaWriter implements Closeable {
+    private final StructProjection structProjection;
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private Map<StructLike, PathOffset> insertedRowMap;
+
+    public BaseEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema deleteSchema) {
+      Preconditions.checkNotNull(schema, "Iceberg table schema cannot be null.");
+      Preconditions.checkNotNull(deleteSchema, "Equality-delete schema cannot be null.");
+      this.structProjection = StructProjection.create(schema, deleteSchema);
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = structProjection.copy().wrap(asStructLike(row));
+      // Adding a pos-delete to replace the old path-offset.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);

Review comment:
       Okay, I misunderstood your point. Sounds good to create a `StructCopy` to copy the values, I like the idea. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r539800091



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +81,135 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base equality delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseEqualityDeltaWriter implements Closeable {
+    private final StructProjection structProjection;
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private Map<StructLike, PathOffset> insertedRowMap;
+
+    public BaseEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema deleteSchema) {
+      Preconditions.checkNotNull(schema, "Iceberg table schema cannot be null.");
+      Preconditions.checkNotNull(deleteSchema, "Equality-delete schema cannot be null.");
+      this.structProjection = StructProjection.create(schema, deleteSchema);
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = structProjection.copy().wrap(asStructLike(row));
+      // Adding a pos-delete to replace the old path-offset.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);

Review comment:
       Finally, I decided to introduce the `asCopiedStructLike`.  People don't have to care how to use it inside the `BaseEqualityDeltaWriter`,  just need to know we need to implement it as a totally new StructLike which won't effect the old one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538813022



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private StructLikeMap<PathOffset> insertedRowMap;
+
+    public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
+      Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(eqDeleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    protected abstract StructLike asCopiedKey(T row);

Review comment:
       I think this could be removed from the API because this already has `asStructLike`. Rather than relying on the implementation to create the projection and copy it, this could be implemented here like this:
   
   ```java
     protected StructLike asCopiedKey(T row) {
       return structProjection.copy().wrap(asStructLike(row));
     }
   ```
   
   We would need to create `StructProjection` in this class, but it would be fairly easy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538796532



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {

Review comment:
       How about `BaseEqualityDeltaWriter`?
   
   I think Spark `MERGE INTO` will likely use a delta writer that doesn't create the equality writer or use the `SortedPosDeleteWriter` because it will request that the rows are already ordered.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r540599413



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -54,6 +64,10 @@ protected BaseTaskWriter(PartitionSpec spec, FileFormat format, FileAppenderFact
     this.targetFileSize = targetFileSize;
   }
 
+  public Set<CharSequence> referencedDataFiles() {
+    return referencedDataFiles;

Review comment:
       Good catch. Should this set be part of the `WriteResult` instead of separate? I think that tasks are going to need to pass the set back to the commit for validation, so adding it to the `WriteResult` seems like the right way to handle it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r539052545



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private StructLikeMap<PathOffset> insertedRowMap;
+
+    public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
+      Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(eqDeleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    protected abstract StructLike asCopiedKey(T row);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = asCopiedKey(row);
+      // Adding a pos-delete to replace the old filePos.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
+      if (previous != null) {
+        // TODO attach the previous row if has a positional-delete row schema in appender factory.
+        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
+      }
+
+      dataWriter.write(row);
+    }
+
+    /**
+     * Delete the rows with the given key.
+     *
+     * @param key is the projected values which could be write to eq-delete file directly.
+     */
+    public void delete(T key) throws IOException {

Review comment:
       I agreed that it's better to provide a `delete(T row)` method which can accept an entire row to write eq-delete file. Because it's possible that have a table with (a,b,c,d) columns,  the equality fields are `(a,c)` columns, while someone want to write the `(a,b,c,d)` values into eq-delete file. 
   
   The problem is:  how could we project the generic data type `T` to match the expected `eqDeleteRowSchema` (so that the eqDeleteWriter could write the correct column values) ?   Sounds like we will need to provide an extra abstracted method to accomplish that generic projection ?  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r539792027



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +81,135 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base equality delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseEqualityDeltaWriter implements Closeable {
+    private final StructProjection structProjection;
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private Map<StructLike, PathOffset> insertedRowMap;
+
+    public BaseEqualityDeltaWriter(PartitionKey partition, Schema schema, Schema deleteSchema) {
+      Preconditions.checkNotNull(schema, "Iceberg table schema cannot be null.");
+      Preconditions.checkNotNull(deleteSchema, "Equality-delete schema cannot be null.");
+      this.structProjection = StructProjection.create(schema, deleteSchema);
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = structProjection.copy().wrap(asStructLike(row));
+      // Adding a pos-delete to replace the old path-offset.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);

Review comment:
       I'm a little hesitant about the `asCopiedStructLike`,  because it introduces complexity to the compute engine developer, people need to consider in which case they need to use `asStructLike` other cases they need to use `asCopiedStructLike`.  Implementing the two abstracted method will need carefully coding.   So I'm thinking how about always copy the `StructLike`  ?   see the flink implementation [here](https://github.com/apache/iceberg/pull/1896/files#diff-e497cabcc9fcf3b1edcd987693853c2dd578f0b3231d5a8f4c3b6f0338bf5e3cR95). 
   
   The copy is light-weight because only few references copy ( from `RowDataWrapper` ).  That would simplify the abstracted methods design.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538795557



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private StructLikeMap<PathOffset> insertedRowMap;
+
+    public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
+      Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");

Review comment:
       Nit: the error message is slightly misleading because it uses "could", which implies that there is some case where it could be null. How about "Equality delete schema cannot be null"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538802022



##########
File path: core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
##########
@@ -75,6 +79,113 @@ public WriteResult complete() throws IOException {
         .build();
   }
 
+  /**
+   * Base delta writer to write both insert records and equality-deletes.
+   */
+  protected abstract class BaseDeltaWriter implements Closeable {
+    private RollingFileWriter dataWriter;
+    private RollingEqDeleteWriter eqDeleteWriter;
+    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private StructLikeMap<PathOffset> insertedRowMap;
+
+    public BaseDeltaWriter(PartitionKey partition, Schema eqDeleteSchema) {
+      Preconditions.checkNotNull(eqDeleteSchema, "equality-delete schema could not be null.");
+
+      this.dataWriter = new RollingFileWriter(partition);
+
+      this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
+      this.insertedRowMap = StructLikeMap.create(eqDeleteSchema.asStruct());
+
+      this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition);
+    }
+
+    /**
+     * Make the generic data could be read as a {@link StructLike}.
+     */
+    protected abstract StructLike asStructLike(T data);
+
+    protected abstract StructLike asCopiedKey(T row);
+
+    public void write(T row) throws IOException {
+      PathOffset pathOffset = PathOffset.of(dataWriter.currentPath(), dataWriter.currentRows());
+
+      StructLike copiedKey = asCopiedKey(row);
+      // Adding a pos-delete to replace the old filePos.
+      PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
+      if (previous != null) {
+        // TODO attach the previous row if has a positional-delete row schema in appender factory.
+        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
+      }
+
+      dataWriter.write(row);
+    }
+
+    /**
+     * Delete the rows with the given key.
+     *
+     * @param key is the projected values which could be write to eq-delete file directly.
+     */
+    public void delete(T key) throws IOException {

Review comment:
       Why pass a key here instead of a row?
   
   I think it would be easier to assume that this is a row, so that the `write` and `delete` methods accept the same data. That also provides a way to write the row to the delete file, or just the key based on configuration. The way it is here, there is no way to write the whole row in the delete.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1888: Core: Add BaseDeltaWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1888:
URL: https://github.com/apache/iceberg/pull/1888#discussion_r538805153



##########
File path: data/src/test/java/org/apache/iceberg/io/TestTaskDeltaWriter.java
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.function.Function;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestTaskDeltaWriter extends TableTestBase {
+  private static final int FORMAT_V2 = 2;
+  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024L;
+
+  private final FileFormat format;
+  private final GenericRecord gRecord = GenericRecord.create(SCHEMA);
+  private final GenericRecord posRecord = GenericRecord.create(DeleteSchemaUtil.pathPosSchema());
+
+  private OutputFileFactory fileFactory = null;
+  private int idFieldId;
+  private int dataFieldId;
+
+  @Parameterized.Parameters(name = "FileFormat = {0}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        {"avro"},
+        {"parquet"}
+    };
+  }
+
+  public TestTaskDeltaWriter(String fileFormat) {
+    super(FORMAT_V2);
+    this.format = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH));
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    this.tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete()); // created by table create
+
+    this.metadataDir = new File(tableDir, "metadata");
+
+    this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+    this.fileFactory = new OutputFileFactory(table.spec(), format, table.locationProvider(), table.io(),
+        table.encryption(), 1, 1);
+
+    this.idFieldId = table.schema().findField("id").fieldId();
+    this.dataFieldId = table.schema().findField("data").fieldId();
+
+    table.updateProperties()
+        .defaultFormat(format)
+        .commit();
+  }
+
+  private Record createRecord(Integer id, String data) {
+    return gRecord.copy("id", id, "data", data);
+  }
+
+  @Test
+  public void testPureInsert() throws IOException {
+    List<Integer> eqDeleteFieldIds = Lists.newArrayList(idFieldId, dataFieldId);
+    Schema eqDeleteSchema = table.schema();
+
+    GenericTaskDeltaWriter deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    List<Record> expected = Lists.newArrayList();
+    for (int i = 0; i < 20; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+
+    WriteResult result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(result);
+    Assert.assertEquals("Should have expected records", expectedRowSet(expected), actualRowSet("*"));
+
+    deltaWriter = createTaskWriter(eqDeleteFieldIds, eqDeleteSchema);
+    for (int i = 20; i < 30; i++) {
+      Record record = createRecord(i, String.format("val-%d", i));
+      expected.add(record);
+
+      deltaWriter.write(record);
+    }
+    result = deltaWriter.complete();
+    Assert.assertEquals("Should only have a data file.", 1, result.dataFiles().length);
+    Assert.assertEquals("Should have no delete file", 0, result.deleteFiles().length);
+    commitTransaction(deltaWriter.complete());

Review comment:
       `commitTransaction(result)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org