You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/20 22:13:56 UTC
[iceberg] branch master updated: Core: Add new rolling file writers
(#3158)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7eeeada Core: Add new rolling file writers (#3158)
7eeeada is described below
commit 7eeeada0dc027025664310c1ca619e9b44fec764
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Sep 20 15:13:46 2021 -0700
Core: Add new rolling file writers (#3158)
---
.../org/apache/iceberg/io/DeleteWriteResult.java | 4 +
.../org/apache/iceberg/io/RollingDataWriter.java | 61 ++++++
.../iceberg/io/RollingEqualityDeleteWriter.java | 64 ++++++
.../org/apache/iceberg/io/RollingFileWriter.java | 143 ++++++++++++
.../iceberg/io/RollingPositionDeleteWriter.java | 68 ++++++
.../apache/iceberg/io/TestRollingFileWriters.java | 243 +++++++++++++++++++++
.../flink/sink/TestFlinkRollingFileWriters.java | 53 +++++
.../spark/source/TestSparkRollingFileWriters.java | 57 +++++
8 files changed, 693 insertions(+)
diff --git a/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java
index e8fe793..3d59e0c 100644
--- a/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java
+++ b/core/src/main/java/org/apache/iceberg/io/DeleteWriteResult.java
@@ -61,4 +61,8 @@ public class DeleteWriteResult {
public CharSequenceSet referencedDataFiles() {
return referencedDataFiles;
}
+
+ public boolean referencesDataFiles() {
+ return referencedDataFiles != null && referencedDataFiles.size() > 0;
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java
new file mode 100644
index 0000000..1642a30
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/RollingDataWriter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * A rolling data writer that splits incoming data into multiple files within one spec/partition
+ * based on the target file size.
+ */
+public class RollingDataWriter<T> extends RollingFileWriter<T, DataWriter<T>, DataWriteResult> {
+
+ private final FileWriterFactory<T> writerFactory;
+ private final List<DataFile> dataFiles;
+
+ public RollingDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+ FileIO io, long targetFileSizeInBytes,
+ PartitionSpec spec, StructLike partition) {
+ super(fileFactory, io, targetFileSizeInBytes, spec, partition);
+ this.writerFactory = writerFactory;
+ this.dataFiles = Lists.newArrayList();
+ openCurrentWriter();
+ }
+
+ @Override
+ protected DataWriter<T> newWriter(EncryptedOutputFile file) {
+ return writerFactory.newDataWriter(file, spec(), partition());
+ }
+
+ @Override
+ protected void addResult(DataWriteResult result) {
+ dataFiles.addAll(result.dataFiles());
+ }
+
+ @Override
+ protected DataWriteResult aggregatedResult() {
+ return new DataWriteResult(dataFiles);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java
new file mode 100644
index 0000000..c12bfd3
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/RollingEqualityDeleteWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * A rolling equality delete writer that splits incoming deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+public class RollingEqualityDeleteWriter<T> extends RollingFileWriter<T, EqualityDeleteWriter<T>, DeleteWriteResult> {
+
+ private final FileWriterFactory<T> writerFactory;
+ private final List<DeleteFile> deleteFiles;
+
+ public RollingEqualityDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+ FileIO io, long targetFileSizeInBytes,
+ PartitionSpec spec, StructLike partition) {
+ super(fileFactory, io, targetFileSizeInBytes, spec, partition);
+ this.writerFactory = writerFactory;
+ this.deleteFiles = Lists.newArrayList();
+ openCurrentWriter();
+ }
+
+ @Override
+ protected EqualityDeleteWriter<T> newWriter(EncryptedOutputFile file) {
+ return writerFactory.newEqualityDeleteWriter(file, spec(), partition());
+ }
+
+ @Override
+ protected void addResult(DeleteWriteResult result) {
+ Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files");
+ deleteFiles.addAll(result.deleteFiles());
+ }
+
+ @Override
+ protected DeleteWriteResult aggregatedResult() {
+ return new DeleteWriteResult(deleteFiles);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
new file mode 100644
index 0000000..ed35933
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * A rolling writer capable of splitting incoming data or deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements FileWriter<T, R> {
+ private static final int ROWS_DIVISOR = 1000;
+
+ private final OutputFileFactory fileFactory;
+ private final FileIO io;
+ private final long targetFileSizeInBytes;
+ private final PartitionSpec spec;
+ private final StructLike partition;
+
+ private EncryptedOutputFile currentFile = null;
+ private long currentFileRows = 0;
+ private W currentWriter = null;
+
+ private boolean closed = false;
+
+ protected RollingFileWriter(OutputFileFactory fileFactory, FileIO io, long targetFileSizeInBytes,
+ PartitionSpec spec, StructLike partition) {
+ this.fileFactory = fileFactory;
+ this.io = io;
+ this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.spec = spec;
+ this.partition = partition;
+ }
+
+ protected abstract W newWriter(EncryptedOutputFile file);
+
+ protected abstract void addResult(R result);
+
+ protected abstract R aggregatedResult();
+
+ protected PartitionSpec spec() {
+ return spec;
+ }
+
+ protected StructLike partition() {
+ return partition;
+ }
+
+ public CharSequence currentFilePath() {
+ return currentFile.encryptingOutputFile().location();
+ }
+
+ public long currentFileRows() {
+ return currentFileRows;
+ }
+
+ @Override
+ public long length() {
+ throw new UnsupportedOperationException(this.getClass().getName() + " does not implement length");
+ }
+
+ @Override
+ public void write(T row) throws IOException {
+ currentWriter.write(row);
+ currentFileRows++;
+
+ if (shouldRollToNewFile()) {
+ closeCurrentWriter();
+ openCurrentWriter();
+ }
+ }
+
+ private boolean shouldRollToNewFile() {
+ return currentFileRows % ROWS_DIVISOR == 0 && currentWriter.length() >= targetFileSizeInBytes;
+ }
+
+ protected void openCurrentWriter() {
+ Preconditions.checkState(currentWriter == null, "Current writer has been already initialized");
+
+ this.currentFile = newFile();
+ this.currentFileRows = 0;
+ this.currentWriter = newWriter(currentFile);
+ }
+
+ private EncryptedOutputFile newFile() {
+ if (partition == null) {
+ return fileFactory.newOutputFile();
+ } else {
+ return fileFactory.newOutputFile(spec, partition);
+ }
+ }
+
+ private void closeCurrentWriter() throws IOException {
+ if (currentWriter != null) {
+ currentWriter.close();
+
+ if (currentFileRows == 0L) {
+ io.deleteFile(currentFile.encryptingOutputFile());
+ } else {
+ addResult(currentWriter.result());
+ }
+
+ this.currentFile = null;
+ this.currentFileRows = 0;
+ this.currentWriter = null;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closeCurrentWriter();
+ this.closed = true;
+ }
+ }
+
+ @Override
+ public final R result() {
+ Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+ return aggregatedResult();
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java
new file mode 100644
index 0000000..001b0bb
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/RollingPositionDeleteWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
+
+/**
+ * A rolling position delete writer that splits incoming deletes into multiple files within one spec/partition
+ * based on the target file size.
+ */
+public class RollingPositionDeleteWriter<T>
+ extends RollingFileWriter<PositionDelete<T>, PositionDeleteWriter<T>, DeleteWriteResult> {
+
+ private final FileWriterFactory<T> writerFactory;
+ private final List<DeleteFile> deleteFiles;
+ private final CharSequenceSet referencedDataFiles;
+
+ public RollingPositionDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+ FileIO io, long targetFileSizeInBytes,
+ PartitionSpec spec, StructLike partition) {
+ super(fileFactory, io, targetFileSizeInBytes, spec, partition);
+ this.writerFactory = writerFactory;
+ this.deleteFiles = Lists.newArrayList();
+ this.referencedDataFiles = CharSequenceSet.empty();
+ openCurrentWriter();
+ }
+
+ @Override
+ protected PositionDeleteWriter<T> newWriter(EncryptedOutputFile file) {
+ return writerFactory.newPositionDeleteWriter(file, spec(), partition());
+ }
+
+ @Override
+ protected void addResult(DeleteWriteResult result) {
+ deleteFiles.addAll(result.deleteFiles());
+ referencedDataFiles.addAll(result.referencedDataFiles());
+ }
+
+ @Override
+ protected DeleteWriteResult aggregatedResult() {
+ return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+ }
+}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
new file mode 100644
index 0000000..e11e0a7
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestRollingFileWriters<T> extends TableTestBase {
+
+ // TODO: add ORC once we support ORC rolling file writers
+
+ @Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ new Object[]{FileFormat.AVRO, false},
+ new Object[]{FileFormat.AVRO, true},
+ new Object[]{FileFormat.PARQUET, false},
+ new Object[]{FileFormat.PARQUET, true},
+ };
+ }
+
+ private static final int TABLE_FORMAT_VERSION = 2;
+ private static final int FILE_SIZE_CHECK_ROWS_DIVISOR = 1000;
+ private static final long DEFAULT_FILE_SIZE = 128L * 1024 * 1024;
+ private static final long SMALL_FILE_SIZE = 2L;
+ private static final String PARTITION_VALUE = "aaa";
+
+ private final FileFormat fileFormat;
+ private final boolean partitioned;
+ private StructLike partition = null;
+ private OutputFileFactory fileFactory = null;
+
+ public TestRollingFileWriters(FileFormat fileFormat, boolean partitioned) {
+ super(TABLE_FORMAT_VERSION);
+ this.fileFormat = fileFormat;
+ this.partitioned = partitioned;
+ }
+
+ protected abstract FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+ Schema equalityDeleteRowSchema);
+
+ protected abstract T toRow(Integer id, String data);
+
+ protected FileFormat format() {
+ return fileFormat;
+ }
+
+ @Before
+ public void setupTable() throws Exception {
+ this.tableDir = temp.newFolder();
+ Assert.assertTrue(tableDir.delete()); // created during table creation
+
+ this.metadataDir = new File(tableDir, "metadata");
+
+ if (partitioned) {
+ this.table = create(SCHEMA, SPEC);
+ this.partition = initPartitionKey();
+ } else {
+ this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+ this.partition = null;
+ }
+
+ this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+ }
+
+ private PartitionKey initPartitionKey() {
+ Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", PARTITION_VALUE));
+
+ PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
+ partitionKey.partition(record);
+
+ return partitionKey;
+ }
+
+ @Test
+ public void testRollingDataWriterNoRecords() throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ RollingDataWriter<T> writer = new RollingDataWriter<>(
+ writerFactory, fileFactory, table.io(),
+ DEFAULT_FILE_SIZE, table.spec(), partition);
+
+ writer.close();
+ Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+ writer.close();
+ Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+ }
+
+ @Test
+ public void testRollingDataWriterSplitData() throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ RollingDataWriter<T> writer = new RollingDataWriter<>(
+ writerFactory, fileFactory, table.io(),
+ SMALL_FILE_SIZE, table.spec(), partition);
+
+ List<T> rows = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR);
+ for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) {
+ rows.add(toRow(index, PARTITION_VALUE));
+ }
+
+ try (RollingDataWriter<T> closableWriter = writer) {
+ closableWriter.write(rows);
+ }
+
+ // call close again to ensure it is idempotent
+ writer.close();
+
+ Assert.assertEquals(4, writer.result().dataFiles().size());
+ }
+
+ @Test
+ public void testRollingEqualityDeleteWriterNoRecords() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+ List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+ Schema equalityDeleteRowSchema = table.schema().select("id");
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+ RollingEqualityDeleteWriter<T> writer = new RollingEqualityDeleteWriter<>(
+ writerFactory, fileFactory, table.io(),
+ DEFAULT_FILE_SIZE, table.spec(), partition);
+
+ writer.close();
+ Assert.assertEquals(0, writer.result().deleteFiles().size());
+ Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse(writer.result().referencesDataFiles());
+
+ writer.close();
+ Assert.assertEquals(0, writer.result().deleteFiles().size());
+ Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse(writer.result().referencesDataFiles());
+ }
+
+ @Test
+ public void testRollingEqualityDeleteWriterSplitDeletes() throws IOException {
+ List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+ Schema equalityDeleteRowSchema = table.schema().select("id");
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+ RollingEqualityDeleteWriter<T> writer = new RollingEqualityDeleteWriter<>(
+ writerFactory, fileFactory, table.io(),
+ SMALL_FILE_SIZE, table.spec(), partition);
+
+ List<T> deletes = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR);
+ for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) {
+ deletes.add(toRow(index, PARTITION_VALUE));
+ }
+
+ try (RollingEqualityDeleteWriter<T> closeableWriter = writer) {
+ closeableWriter.write(deletes);
+ }
+
+ // call close again to ensure it is idempotent
+ writer.close();
+
+ DeleteWriteResult result = writer.result();
+ Assert.assertEquals(4, result.deleteFiles().size());
+ Assert.assertEquals(0, result.referencedDataFiles().size());
+ Assert.assertFalse(result.referencesDataFiles());
+ }
+
+ @Test
+ public void testRollingPositionDeleteWriterNoRecords() throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ RollingPositionDeleteWriter<T> writer = new RollingPositionDeleteWriter<>(
+ writerFactory, fileFactory, table.io(),
+ DEFAULT_FILE_SIZE, table.spec(), partition);
+
+ writer.close();
+ Assert.assertEquals(0, writer.result().deleteFiles().size());
+ Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse(writer.result().referencesDataFiles());
+
+ writer.close();
+ Assert.assertEquals(0, writer.result().deleteFiles().size());
+ Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse(writer.result().referencesDataFiles());
+ }
+
+ @Test
+ public void testRollingPositionDeleteWriterSplitDeletes() throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ RollingPositionDeleteWriter<T> writer = new RollingPositionDeleteWriter<>(
+ writerFactory, fileFactory, table.io(),
+ SMALL_FILE_SIZE, table.spec(), partition);
+
+ List<PositionDelete<T>> deletes = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR);
+ for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) {
+ deletes.add(new PositionDelete<T>().set("path/to/data/file-1.parquet", index, null));
+ }
+
+ try (RollingPositionDeleteWriter<T> closeableWriter = writer) {
+ closeableWriter.write(deletes);
+ }
+
+ // call close again to ensure it is idempotent
+ writer.close();
+
+ DeleteWriteResult result = writer.result();
+ Assert.assertEquals(4, result.deleteFiles().size());
+ Assert.assertEquals(1, result.referencedDataFiles().size());
+ Assert.assertTrue(result.referencesDataFiles());
+ }
+
+ private FileWriterFactory<T> newWriterFactory(Schema dataSchema) {
+ return newWriterFactory(dataSchema, null, null);
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
new file mode 100644
index 0000000..a3d62d5
--- /dev/null
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.TestRollingFileWriters;
+import org.apache.iceberg.util.ArrayUtil;
+
+public class TestFlinkRollingFileWriters extends TestRollingFileWriters<RowData> {
+
+ public TestFlinkRollingFileWriters(FileFormat fileFormat, boolean partitioned) {
+ super(fileFormat, partitioned);
+ }
+
+ @Override
+ protected FileWriterFactory<RowData> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+ Schema equalityDeleteRowSchema) {
+ return FlinkFileWriterFactory.builderFor(table)
+ .dataSchema(table.schema())
+ .dataFileFormat(format())
+ .deleteFileFormat(format())
+ .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
+ .equalityDeleteRowSchema(equalityDeleteRowSchema)
+ .build();
+ }
+
+ @Override
+ protected RowData toRow(Integer id, String data) {
+ return SimpleDataUtil.createRowData(id, data);
+ }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
new file mode 100644
index 0000000..3ea2353
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileWriterFactory;
+import org.apache.iceberg.io.TestRollingFileWriters;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class TestSparkRollingFileWriters extends TestRollingFileWriters<InternalRow> {
+
+ public TestSparkRollingFileWriters(FileFormat fileFormat, boolean partitioned) {
+ super(fileFormat, partitioned);
+ }
+
+ @Override
+ protected FileWriterFactory<InternalRow> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+ Schema equalityDeleteRowSchema) {
+ return SparkFileWriterFactory.builderFor(table)
+ .dataSchema(table.schema())
+ .dataFileFormat(format())
+ .deleteFileFormat(format())
+ .equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
+ .equalityDeleteRowSchema(equalityDeleteRowSchema)
+ .build();
+ }
+
+ @Override
+ protected InternalRow toRow(Integer id, String data) {
+ InternalRow row = new GenericInternalRow(2);
+ row.update(0, id);
+ row.update(1, UTF8String.fromString(data));
+ return row;
+ }
+}