You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/23 23:25:46 UTC
[iceberg] branch master updated: Core: Add PartitioningWriter
(#3164)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 11f327a Core: Add PartitioningWriter (#3164)
11f327a is described below
commit 11f327a2f08d95fdf8fea68412f0ae1687a3b63f
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu Sep 23 16:25:33 2021 -0700
Core: Add PartitioningWriter (#3164)
---
build.gradle | 4 -
.../org/apache/iceberg/deletes/PositionDelete.java | 5 +-
.../org/apache/iceberg/io/ClusteredDataWriter.java | 73 +++
.../iceberg/io/ClusteredEqualityDeleteWriter.java | 75 +++
.../iceberg/io/ClusteredPositionDeleteWriter.java | 78 +++
.../org/apache/iceberg/io/ClusteredWriter.java | 134 ++++++
.../org/apache/iceberg/io/FanoutDataWriter.java | 73 +++
.../java/org/apache/iceberg/io/FanoutWriter.java | 105 ++++
.../org/apache/iceberg/io/PartitioningWriter.java | 60 +++
.../java/org/apache/iceberg/io/StructCopy.java | 2 +-
.../java/org/apache/iceberg/TableTestBase.java | 6 +
.../org/apache/iceberg/io/TestAppenderFactory.java | 6 +-
.../apache/iceberg/io/TestFileWriterFactory.java | 54 +--
.../apache/iceberg/io/TestPartitioningWriters.java | 535 +++++++++++++++++++++
.../apache/iceberg/io/TestRollingFileWriters.java | 29 +-
.../java/org/apache/iceberg/io/WriterTestBase.java | 93 ++++
...ters.java => TestFlinkPartitioningWriters.java} | 27 +-
.../flink/sink/TestFlinkRollingFileWriters.java | 4 +-
jmh.gradle | 2 +-
.../iceberg/spark/source/WritersBenchmark.java | 353 ++++++++++++++
.../spark/source/avro/AvroWritersBenchmark.java | 41 ++
.../source/parquet/ParquetWritersBenchmark.java | 41 ++
...ters.java => TestSparkPartitioningWriters.java} | 26 +-
.../spark/source/TestSparkRollingFileWriters.java | 4 +-
24 files changed, 1736 insertions(+), 94 deletions(-)
diff --git a/build.gradle b/build.gradle
index cecf8b7..a56a03e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -115,10 +115,6 @@ subprojects {
options.encoding = 'UTF-8'
}
- ext {
- jmhVersion = '1.21'
- }
-
sourceCompatibility = '1.8'
targetCompatibility = '1.8'
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
index 4cd2d31..cac04ca 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDelete.java
@@ -22,7 +22,7 @@ package org.apache.iceberg.deletes;
import org.apache.iceberg.StructLike;
public class PositionDelete<R> implements StructLike {
- static <T> PositionDelete<T> create() {
+ public static <T> PositionDelete<T> create() {
return new PositionDelete<>();
}
@@ -30,6 +30,9 @@ public class PositionDelete<R> implements StructLike {
private long pos;
private R row;
+ private PositionDelete() {
+ }
+
public PositionDelete<R> set(CharSequence newPath, long newPos, R newRow) {
this.path = newPath;
this.pos = newPos;
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
new file mode 100644
index 0000000..a6982cd
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredDataWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * A data writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be properly clustered by partition spec and by partition within each spec.
+ */
+public class ClusteredDataWriter<T> extends ClusteredWriter<T, DataWriteResult> {
+
+ private final FileWriterFactory<T> writerFactory;
+ private final OutputFileFactory fileFactory;
+ private final FileIO io;
+ private final FileFormat fileFormat;
+ private final long targetFileSizeInBytes;
+ private final List<DataFile> dataFiles;
+
+ public ClusteredDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+ FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
+ this.writerFactory = writerFactory;
+ this.fileFactory = fileFactory;
+ this.io = io;
+ this.fileFormat = fileFormat;
+ this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.dataFiles = Lists.newArrayList();
+ }
+
+ @Override
+ protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
+ // TODO: support ORC rolling writers
+ if (fileFormat == FileFormat.ORC) {
+ EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
+ return writerFactory.newDataWriter(outputFile, spec, partition);
+ } else {
+ return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
+ }
+ }
+
+ @Override
+ protected void addResult(DataWriteResult result) {
+ dataFiles.addAll(result.dataFiles());
+ }
+
+ @Override
+ protected DataWriteResult aggregatedResult() {
+ return new DataWriteResult(dataFiles);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
new file mode 100644
index 0000000..385d1a5
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredEqualityDeleteWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * An equality delete writer capable of writing to multiple specs and partitions that requires
+ * the incoming delete records to be properly clustered by partition spec and by partition within each spec.
+ */
+public class ClusteredEqualityDeleteWriter<T> extends ClusteredWriter<T, DeleteWriteResult> {
+
+ private final FileWriterFactory<T> writerFactory;
+ private final OutputFileFactory fileFactory;
+ private final FileIO io;
+ private final FileFormat fileFormat;
+ private final long targetFileSizeInBytes;
+ private final List<DeleteFile> deleteFiles;
+
+ public ClusteredEqualityDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+ FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
+ this.writerFactory = writerFactory;
+ this.fileFactory = fileFactory;
+ this.io = io;
+ this.fileFormat = fileFormat;
+ this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.deleteFiles = Lists.newArrayList();
+ }
+
+ @Override
+ protected FileWriter<T, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
+ // TODO: support ORC rolling writers
+ if (fileFormat == FileFormat.ORC) {
+ EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
+ return writerFactory.newEqualityDeleteWriter(outputFile, spec, partition);
+ } else {
+ return new RollingEqualityDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
+ }
+ }
+
+ @Override
+ protected void addResult(DeleteWriteResult result) {
+ Preconditions.checkArgument(!result.referencesDataFiles(), "Equality deletes cannot reference data files");
+ deleteFiles.addAll(result.deleteFiles());
+ }
+
+ @Override
+ protected DeleteWriteResult aggregatedResult() {
+ return new DeleteWriteResult(deleteFiles);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
new file mode 100644
index 0000000..ea11838
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredPositionDeleteWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.CharSequenceSet;
+
+/**
+ * A position delete writer capable of writing to multiple specs and partitions that requires
+ * the incoming delete records to be properly clustered by partition spec and by partition within each spec.
+ */
+public class ClusteredPositionDeleteWriter<T> extends ClusteredWriter<PositionDelete<T>, DeleteWriteResult> {
+
+ private final FileWriterFactory<T> writerFactory;
+ private final OutputFileFactory fileFactory;
+ private final FileIO io;
+ private final FileFormat fileFormat;
+ private final long targetFileSizeInBytes;
+ private final List<DeleteFile> deleteFiles;
+ private final CharSequenceSet referencedDataFiles;
+
+ public ClusteredPositionDeleteWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+ FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
+ this.writerFactory = writerFactory;
+ this.fileFactory = fileFactory;
+ this.io = io;
+ this.fileFormat = fileFormat;
+ this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.deleteFiles = Lists.newArrayList();
+ this.referencedDataFiles = CharSequenceSet.empty();
+ }
+
+ @Override
+ protected FileWriter<PositionDelete<T>, DeleteWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
+ // TODO: support ORC rolling writers
+ if (fileFormat == FileFormat.ORC) {
+ EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
+ return writerFactory.newPositionDeleteWriter(outputFile, spec, partition);
+ } else {
+ return new RollingPositionDeleteWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
+ }
+ }
+
+ @Override
+ protected void addResult(DeleteWriteResult result) {
+ deleteFiles.addAll(result.deleteFiles());
+ referencedDataFiles.addAll(result.referencedDataFiles());
+ }
+
+ @Override
+ protected DeleteWriteResult aggregatedResult() {
+ return new DeleteWriteResult(deleteFiles, referencedDataFiles);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
new file mode 100644
index 0000000..8729fd1
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Set;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.StructLikeSet;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that requires the incoming records
+ * to be clustered by partition spec and by partition within each spec.
+ * <p>
+ * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
+ * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
+ * by spec/partition.
+ */
+abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
+
+ private static final String NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE =
+ "Incoming records violate the writer assumption that records are clustered by spec and " +
+ "by partition within each spec. Either cluster the incoming records or switch to fanout writers.\n" +
+ "Encountered records that belong to already closed files:\n";
+
+ private final Set<Integer> completedSpecIds = Sets.newHashSet();
+
+ private PartitionSpec currentSpec = null;
+ private Comparator<StructLike> partitionComparator = null;
+ private Set<StructLike> completedPartitions = null;
+ private StructLike currentPartition = null;
+ private FileWriter<T, R> currentWriter = null;
+
+ private boolean closed = false;
+
+ protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+ protected abstract void addResult(R result);
+
+ protected abstract R aggregatedResult();
+
+ @Override
+ public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+ if (!spec.equals(currentSpec)) {
+ if (currentSpec != null) {
+ closeCurrentWriter();
+ completedSpecIds.add(currentSpec.specId());
+ completedPartitions.clear();
+ }
+
+ if (completedSpecIds.contains(spec.specId())) {
+ String errorCtx = String.format("spec %s", spec);
+ throw new IllegalStateException(NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE + errorCtx);
+ }
+
+ StructType partitionType = spec.partitionType();
+
+ this.currentSpec = spec;
+ this.partitionComparator = Comparators.forType(partitionType);
+ this.completedPartitions = StructLikeSet.create(partitionType);
+ // copy the partition key as the key object may be reused
+ this.currentPartition = StructCopy.copy(partition);
+ this.currentWriter = newWriter(currentSpec, currentPartition);
+
+ } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
+ closeCurrentWriter();
+ completedPartitions.add(currentPartition);
+
+ if (completedPartitions.contains(partition)) {
+ String errorCtx = String.format("partition '%s' in spec %s", spec.partitionToPath(partition), spec);
+ throw new IllegalStateException(NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE + errorCtx);
+ }
+
+ // copy the partition key as the key object may be reused
+ this.currentPartition = StructCopy.copy(partition);
+ this.currentWriter = newWriter(currentSpec, currentPartition);
+ }
+
+ currentWriter.write(row);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closeCurrentWriter();
+ this.closed = true;
+ }
+ }
+
+ private void closeCurrentWriter() throws IOException {
+ if (currentWriter != null) {
+ currentWriter.close();
+
+ addResult(currentWriter.result());
+
+ this.currentWriter = null;
+ }
+ }
+
+ @Override
+ public final R result() {
+ Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+ return aggregatedResult();
+ }
+
+ protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) {
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null when creating output file for partitioned spec");
+ return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java
new file mode 100644
index 0000000..d6e16a7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/FanoutDataWriter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * A data writer capable of writing to multiple specs and partitions that keeps data writers for each
+ * seen spec/partition pair open until this writer is closed.
+ */
+public class FanoutDataWriter<T> extends FanoutWriter<T, DataWriteResult> {
+
+ private final FileWriterFactory<T> writerFactory;
+ private final OutputFileFactory fileFactory;
+ private final FileIO io;
+ private final FileFormat fileFormat;
+ private final long targetFileSizeInBytes;
+ private final List<DataFile> dataFiles;
+
+ public FanoutDataWriter(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+ FileIO io, FileFormat fileFormat, long targetFileSizeInBytes) {
+ this.writerFactory = writerFactory;
+ this.fileFactory = fileFactory;
+ this.io = io;
+ this.fileFormat = fileFormat;
+ this.targetFileSizeInBytes = targetFileSizeInBytes;
+ this.dataFiles = Lists.newArrayList();
+ }
+
+ @Override
+ protected FileWriter<T, DataWriteResult> newWriter(PartitionSpec spec, StructLike partition) {
+ // TODO: support ORC rolling writers
+ if (fileFormat == FileFormat.ORC) {
+ EncryptedOutputFile outputFile = newOutputFile(fileFactory, spec, partition);
+ return writerFactory.newDataWriter(outputFile, spec, partition);
+ } else {
+ return new RollingDataWriter<>(writerFactory, fileFactory, io, targetFileSizeInBytes, spec, partition);
+ }
+ }
+
+ @Override
+ protected void addResult(DataWriteResult result) {
+ dataFiles.addAll(result.dataFiles());
+ }
+
+ @Override
+ protected DataWriteResult aggregatedResult() {
+ return new DataWriteResult(dataFiles);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
new file mode 100644
index 0000000..122a25d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.StructLikeMap;
+
+/**
+ * A writer capable of writing to multiple specs and partitions that keeps files for each
+ * seen spec/partition pair open until this writer is closed.
+ * <p>
+ * As opposed to {@link ClusteredWriter}, this writer does not require the incoming records
+ * to be clustered by partition spec and partition as all files are kept open. As a consequence,
+ * this writer may potentially consume substantially more memory compared to {@link ClusteredWriter}.
+ * Use this writer only when clustering by spec/partition is not possible (e.g. streaming).
+ */
+abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
+
+ private final Map<Integer, StructLikeMap<FileWriter<T, R>>> writers = Maps.newHashMap();
+ private boolean closed = false;
+
+ protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
+
+ protected abstract void addResult(R result);
+
+ protected abstract R aggregatedResult();
+
+ @Override
+ public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+ FileWriter<T, R> writer = writer(spec, partition);
+ writer.write(row);
+ }
+
+ private FileWriter<T, R> writer(PartitionSpec spec, StructLike partition) {
+ Map<StructLike, FileWriter<T, R>> specWriters = writers.computeIfAbsent(
+ spec.specId(),
+ id -> StructLikeMap.create(spec.partitionType()));
+ FileWriter<T, R> writer = specWriters.get(partition);
+
+ if (writer == null) {
+ // copy the partition key as the key object may be reused
+ StructLike copiedPartition = StructCopy.copy(partition);
+ writer = newWriter(spec, copiedPartition);
+ specWriters.put(copiedPartition, writer);
+ }
+
+ return writer;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closeWriters();
+ this.closed = true;
+ }
+ }
+
+ private void closeWriters() throws IOException {
+ for (Map<StructLike, FileWriter<T, R>> specWriters : writers.values()) {
+ for (FileWriter<T, R> writer : specWriters.values()) {
+ writer.close();
+ addResult(writer.result());
+ }
+
+ specWriters.clear();
+ }
+
+ writers.clear();
+ }
+
+ @Override
+ public final R result() {
+ Preconditions.checkState(closed, "Cannot get result from unclosed writer");
+ return aggregatedResult();
+ }
+
+ protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) {
+ Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
+ "Partition must not be null when creating output file for partitioned spec");
+ return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java
new file mode 100644
index 0000000..329e68c
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+
+/**
+ * A writer capable of writing files of a single type (i.e. data/delete) to multiple specs and partitions.
+ * <p>
+ * As opposed to {@link FileWriter}, this interface should be implemented by writers that are not
+ * limited to writing to a single spec/partition. Implementations may internally use {@link FileWriter}s
+ * for writing to a single spec/partition.
+ * <p>
+ * Note that this writer can be used both for partitioned and unpartitioned tables.
+ *
+ * @param <T> the row type
+ * @param <R> the result type
+ */
+public interface PartitioningWriter<T, R> extends Closeable {
+
+ /**
+ * Writes a row to the provided spec/partition.
+ *
+ * @param row a data or delete record
+ * @param spec a partition spec
+ * @param partition a partition or null if the spec is unpartitioned
+ * @throws IOException in case of an error during the write process
+ */
+ void write(T row, PartitionSpec spec, StructLike partition) throws IOException;
+
+ /**
+ * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s.
+ * The result is valid only after the writer is closed.
+ *
+ * @return the writer result
+ */
+ R result();
+}
diff --git a/core/src/main/java/org/apache/iceberg/io/StructCopy.java b/core/src/main/java/org/apache/iceberg/io/StructCopy.java
index 6cf461d..7d43733 100644
--- a/core/src/main/java/org/apache/iceberg/io/StructCopy.java
+++ b/core/src/main/java/org/apache/iceberg/io/StructCopy.java
@@ -26,7 +26,7 @@ import org.apache.iceberg.StructLike;
*/
class StructCopy implements StructLike {
static StructLike copy(StructLike struct) {
- return new StructCopy(struct);
+ return struct != null ? new StructCopy(struct) : null;
}
private final Object[] values;
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 9ef28b9..7325cfe 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.LongStream;
+import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -454,6 +455,11 @@ public class TableTestBase {
.build();
}
+ protected <T> PositionDelete<T> positionDelete(CharSequence path, long pos, T row) {
+ PositionDelete<T> positionDelete = PositionDelete.create();
+ return positionDelete.set(path, pos, row);
+ }
+
static void validateManifestEntries(ManifestFile manifest,
Iterator<Long> ids,
Iterator<DataFile> expectedFiles,
diff --git a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
index 7df402e..0a88cbc 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java
@@ -273,9 +273,9 @@ public abstract class TestAppenderFactory<T> extends TableTestBase {
DataFile dataFile = prepareDataFile(rowSet, appenderFactory);
List<PositionDelete<T>> deletes = Lists.newArrayList(
- new PositionDelete<T>().set(dataFile.path(), 0, rowSet.get(0)),
- new PositionDelete<T>().set(dataFile.path(), 2, rowSet.get(2)),
- new PositionDelete<T>().set(dataFile.path(), 4, rowSet.get(4))
+ positionDelete(dataFile.path(), 0, rowSet.get(0)),
+ positionDelete(dataFile.path(), 2, rowSet.get(2)),
+ positionDelete(dataFile.path(), 4, rowSet.get(4))
);
EncryptedOutputFile out = createEncryptedOutputFile();
diff --git a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
index 734e5d3..c3c4691 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java
@@ -26,14 +26,11 @@ import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
@@ -59,7 +56,7 @@ import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_ROW_FIELD_NAME;
@RunWith(Parameterized.class)
-public abstract class TestFileWriterFactory<T> extends TableTestBase {
+public abstract class TestFileWriterFactory<T> extends WriterTestBase<T> {
@Parameterized.Parameters(name = "FileFormat={0}, Partitioned={1}")
public static Object[] parameters() {
return new Object[][] {
@@ -73,6 +70,7 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
}
private static final int TABLE_FORMAT_VERSION = 2;
+ private static final String PARTITION_VALUE = "aaa";
private final FileFormat fileFormat;
private final boolean partitioned;
@@ -94,12 +92,6 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
);
}
- protected abstract FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
- Schema equalityDeleteRowSchema,
- Schema positionDeleteRowSchema);
-
- protected abstract T toRow(Integer id, String data);
-
protected abstract StructLikeSet toSet(Iterable<T> records);
protected FileFormat format() {
@@ -115,7 +107,7 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
if (partitioned) {
this.table = create(SCHEMA, SPEC);
- this.partition = initPartitionKey();
+ this.partition = partitionKey(table.spec(), PARTITION_VALUE);
} else {
this.table = create(SCHEMA, PartitionSpec.unpartitioned());
this.partition = null;
@@ -222,7 +214,7 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
.addField("data")
.commit();
- partition = initPartitionKey();
+ partition = partitionKey(table.spec(), PARTITION_VALUE);
// write a partitioned data file
DataFile secondDataFile = writeData(writerFactory, dataRows, table.spec(), partition);
@@ -259,9 +251,9 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
// write a position delete file
List<PositionDelete<T>> deletes = ImmutableList.of(
- new PositionDelete<T>().set(dataFile.path(), 0L, null),
- new PositionDelete<T>().set(dataFile.path(), 2L, null),
- new PositionDelete<T>().set(dataFile.path(), 4L, null)
+ positionDelete(dataFile.path(), 0L, null),
+ positionDelete(dataFile.path(), 2L, null),
+ positionDelete(dataFile.path(), 4L, null)
);
Pair<DeleteFile, CharSequenceSet> result = writePositionDeletes(writerFactory, deletes, table.spec(), partition);
DeleteFile deleteFile = result.first();
@@ -305,7 +297,7 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
// write a position delete file and persist the deleted row
List<PositionDelete<T>> deletes = ImmutableList.of(
- new PositionDelete<T>().set(dataFile.path(), 0, dataRows.get(0))
+ positionDelete(dataFile.path(), 0, dataRows.get(0))
);
Pair<DeleteFile, CharSequenceSet> result = writePositionDeletes(writerFactory, deletes, table.spec(), partition);
DeleteFile deleteFile = result.first();
@@ -343,28 +335,6 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
}
- private PartitionKey initPartitionKey() {
- Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa"));
-
- PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
- partitionKey.partition(record);
-
- return partitionKey;
- }
-
- private FileWriterFactory<T> newWriterFactory(Schema dataSchema) {
- return newWriterFactory(dataSchema, null, null, null);
- }
-
- private FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
- Schema equalityDeleteRowSchema) {
- return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null);
- }
-
- private FileWriterFactory<T> newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) {
- return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema);
- }
-
private DataFile writeData(FileWriterFactory<T> writerFactory, List<T> rows,
PartitionSpec spec, StructLike partitionKey) throws IOException {
@@ -435,14 +405,6 @@ public abstract class TestFileWriterFactory<T> extends TableTestBase {
}
}
- private StructLikeSet actualRowSet(String... columns) throws IOException {
- StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
- try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
- reader.forEach(set::add);
- }
- return set;
- }
-
private EncryptedOutputFile newOutputFile(PartitionSpec spec, StructLike partitionKey) {
return fileFactory.newOutputFile(spec, partitionKey);
}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
new file mode 100644
index 0000000..b01ccc5
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
+
+ @Parameterized.Parameters(name = "FileFormat={0}")
+ public static Object[] parameters() {
+ return new Object[][] {
+ new Object[]{FileFormat.AVRO},
+ new Object[]{FileFormat.PARQUET},
+ new Object[]{FileFormat.ORC},
+ };
+ }
+
+ private static final int TABLE_FORMAT_VERSION = 2;
+ private static final long TARGET_FILE_SIZE = 128L * 1024 * 1024;
+
+ private final FileFormat fileFormat;
+ private OutputFileFactory fileFactory = null;
+
+ public TestPartitioningWriters(FileFormat fileFormat) {
+ super(TABLE_FORMAT_VERSION);
+ this.fileFormat = fileFormat;
+ }
+
+ protected abstract StructLikeSet toSet(Iterable<T> records);
+
+ protected FileFormat format() {
+ return fileFormat;
+ }
+
+ @Before
+ public void setupTable() throws Exception {
+ this.tableDir = temp.newFolder();
+ Assert.assertTrue(tableDir.delete()); // created during table creation
+
+ this.metadataDir = new File(tableDir, "metadata");
+ this.table = create(SCHEMA, PartitionSpec.unpartitioned());
+ this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
+ }
+
+ @Test
+ public void testClusteredDataWriterNoRecords() throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ writer.close();
+ Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+ writer.close();
+ Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+ }
+
+ @Test
+ public void testClusteredDataWriterMultiplePartitions() throws IOException {
+ table.updateSpec()
+ .addField(Expressions.ref("data"))
+ .commit();
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ PartitionSpec spec = table.spec();
+
+ writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+ writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+ writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+ writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+ writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+ writer.close();
+
+ DataWriteResult result = writer.result();
+ Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+ RowDelta rowDelta = table.newRowDelta();
+ result.dataFiles().forEach(rowDelta::addRows);
+ rowDelta.commit();
+
+ List<T> expectedRows = ImmutableList.of(
+ toRow(1, "aaa"),
+ toRow(2, "aaa"),
+ toRow(3, "bbb"),
+ toRow(4, "bbb"),
+ toRow(5, "ccc")
+ );
+ Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+ }
+
+ @Test
+ public void testClusteredDataWriterOutOfOrderPartitions() throws IOException {
+ table.updateSpec()
+ .addField(Expressions.ref("data"))
+ .commit();
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ ClusteredDataWriter<T> writer = new ClusteredDataWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ PartitionSpec spec = table.spec();
+
+ writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+ writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+ writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+ writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+ writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+ AssertHelpers.assertThrows("Should fail to write out of order partitions",
+ IllegalStateException.class, "Encountered records that belong to already closed files",
+ () -> {
+ try {
+ writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+
+ writer.close();
+ }
+
+ @Test
+ public void testClusteredEqualityDeleteWriterNoRecords() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+ List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+ Schema equalityDeleteRowSchema = table.schema().select("id");
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+ ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ writer.close();
+ Assert.assertEquals(0, writer.result().deleteFiles().size());
+ Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse(writer.result().referencesDataFiles());
+
+ writer.close();
+ Assert.assertEquals(0, writer.result().deleteFiles().size());
+ Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse(writer.result().referencesDataFiles());
+ }
+
+ @Test
+ public void testClusteredEqualityDeleteWriterMultipleSpecs() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+ List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+ Schema equalityDeleteRowSchema = table.schema().select("id");
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+
+ // add an unpartitioned data file
+ ImmutableList<T> rows1 = ImmutableList.of(
+ toRow(1, "aaa"),
+ toRow(2, "aaa"),
+ toRow(11, "aaa")
+ );
+ DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null);
+ table.newFastAppend()
+ .appendFile(dataFile1)
+ .commit();
+
+ // partition by bucket
+ table.updateSpec()
+ .addField(Expressions.bucket("data", 16))
+ .commit();
+
+ // add a data file partitioned by bucket
+ ImmutableList<T> rows2 = ImmutableList.of(
+ toRow(3, "bbb"),
+ toRow(4, "bbb"),
+ toRow(12, "bbb")
+ );
+ DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), partitionKey(table.spec(), "bbb"));
+ table.newFastAppend()
+ .appendFile(dataFile2)
+ .commit();
+
+ // partition by data
+ table.updateSpec()
+ .removeField(Expressions.bucket("data", 16))
+ .addField(Expressions.ref("data"))
+ .commit();
+
+ // add a data file partitioned by data
+ ImmutableList<T> rows3 = ImmutableList.of(
+ toRow(5, "ccc"),
+ toRow(13, "ccc")
+ );
+ DataFile dataFile3 = writeData(writerFactory, fileFactory, rows3, table.spec(), partitionKey(table.spec(), "ccc"));
+ table.newFastAppend()
+ .appendFile(dataFile3)
+ .commit();
+
+ ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ PartitionSpec unpartitionedSpec = table.specs().get(0);
+ PartitionSpec bucketSpec = table.specs().get(1);
+ PartitionSpec identitySpec = table.specs().get(2);
+
+ writer.write(toRow(1, "aaa"), unpartitionedSpec, null);
+ writer.write(toRow(2, "aaa"), unpartitionedSpec, null);
+ writer.write(toRow(3, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+ writer.write(toRow(4, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+ writer.write(toRow(5, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
+
+ writer.close();
+
+ DeleteWriteResult result = writer.result();
+ Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size());
+ Assert.assertEquals("Must not reference data files", 0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse("Must not reference data files", writer.result().referencesDataFiles());
+
+ RowDelta rowDelta = table.newRowDelta();
+ result.deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+
+ List<T> expectedRows = ImmutableList.of(
+ toRow(11, "aaa"),
+ toRow(12, "bbb"),
+ toRow(13, "ccc")
+ );
+ Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+ }
+
+ @Test
+ public void testClusteredEqualityDeleteWriterOutOfOrderSpecsAndPartitions() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+ List<Integer> equalityFieldIds = ImmutableList.of(table.schema().findField("id").fieldId());
+ Schema equalityDeleteRowSchema = table.schema().select("id");
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema(), equalityFieldIds, equalityDeleteRowSchema);
+
+ table.updateSpec()
+ .addField(Expressions.bucket("data", 16))
+ .commit();
+
+ table.updateSpec()
+ .removeField(Expressions.bucket("data", 16))
+ .addField(Expressions.ref("data"))
+ .commit();
+
+ ClusteredEqualityDeleteWriter<T> writer = new ClusteredEqualityDeleteWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ PartitionSpec unpartitionedSpec = table.specs().get(0);
+ PartitionSpec bucketSpec = table.specs().get(1);
+ PartitionSpec identitySpec = table.specs().get(2);
+
+ writer.write(toRow(1, "aaa"), unpartitionedSpec, null);
+ writer.write(toRow(2, "aaa"), unpartitionedSpec, null);
+ writer.write(toRow(3, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+ writer.write(toRow(4, "bbb"), bucketSpec, partitionKey(bucketSpec, "bbb"));
+ writer.write(toRow(5, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
+ writer.write(toRow(6, "ddd"), identitySpec, partitionKey(identitySpec, "ddd"));
+
+ AssertHelpers.assertThrows("Should fail to write out of order partitions",
+ IllegalStateException.class, "Encountered records that belong to already closed files",
+ () -> {
+ try {
+ writer.write(toRow(7, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+
+ AssertHelpers.assertThrows("Should fail to write out of order specs",
+ IllegalStateException.class, "Encountered records that belong to already closed files",
+ () -> {
+ try {
+ writer.write(toRow(7, "aaa"), unpartitionedSpec, null);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+
+ writer.close();
+ }
+
+ @Test
+ public void testClusteredPositionDeleteWriterNoRecords() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ ClusteredPositionDeleteWriter<T> writer = new ClusteredPositionDeleteWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ writer.close();
+ Assert.assertEquals(0, writer.result().deleteFiles().size());
+ Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse(writer.result().referencesDataFiles());
+
+ writer.close();
+ Assert.assertEquals(0, writer.result().deleteFiles().size());
+ Assert.assertEquals(0, writer.result().referencedDataFiles().size());
+ Assert.assertFalse(writer.result().referencesDataFiles());
+ }
+
+ @Test
+ public void testClusteredPositionDeleteWriterMultipleSpecs() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // add an unpartitioned data file
+ ImmutableList<T> rows1 = ImmutableList.of(
+ toRow(1, "aaa"),
+ toRow(2, "aaa"),
+ toRow(11, "aaa")
+ );
+ DataFile dataFile1 = writeData(writerFactory, fileFactory, rows1, table.spec(), null);
+ table.newFastAppend()
+ .appendFile(dataFile1)
+ .commit();
+
+ // partition by bucket
+ table.updateSpec()
+ .addField(Expressions.bucket("data", 16))
+ .commit();
+
+ // add a data file partitioned by bucket
+ ImmutableList<T> rows2 = ImmutableList.of(
+ toRow(3, "bbb"),
+ toRow(4, "bbb"),
+ toRow(12, "bbb")
+ );
+ DataFile dataFile2 = writeData(writerFactory, fileFactory, rows2, table.spec(), partitionKey(table.spec(), "bbb"));
+ table.newFastAppend()
+ .appendFile(dataFile2)
+ .commit();
+
+ // partition by data
+ table.updateSpec()
+ .removeField(Expressions.bucket("data", 16))
+ .addField(Expressions.ref("data"))
+ .commit();
+
+ // add a data file partitioned by data
+ ImmutableList<T> rows3 = ImmutableList.of(
+ toRow(5, "ccc"),
+ toRow(13, "ccc")
+ );
+ DataFile dataFile3 = writeData(writerFactory, fileFactory, rows3, table.spec(), partitionKey(table.spec(), "ccc"));
+ table.newFastAppend()
+ .appendFile(dataFile3)
+ .commit();
+
+ ClusteredPositionDeleteWriter<T> writer = new ClusteredPositionDeleteWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ PartitionSpec unpartitionedSpec = table.specs().get(0);
+ PartitionSpec bucketSpec = table.specs().get(1);
+ PartitionSpec identitySpec = table.specs().get(2);
+
+ writer.write(positionDelete(dataFile1.path(), 0L, null), unpartitionedSpec, null);
+ writer.write(positionDelete(dataFile1.path(), 1L, null), unpartitionedSpec, null);
+ writer.write(positionDelete(dataFile2.path(), 0L, null), bucketSpec, partitionKey(bucketSpec, "bbb"));
+ writer.write(positionDelete(dataFile2.path(), 1L, null), bucketSpec, partitionKey(bucketSpec, "bbb"));
+ writer.write(positionDelete(dataFile3.path(), 0L, null), identitySpec, partitionKey(identitySpec, "ccc"));
+
+ writer.close();
+
+ DeleteWriteResult result = writer.result();
+ Assert.assertEquals("Must be 3 delete files", 3, result.deleteFiles().size());
+ Assert.assertEquals("Must reference 3 data files", 3, writer.result().referencedDataFiles().size());
+ Assert.assertTrue("Must reference data files", writer.result().referencesDataFiles());
+
+ RowDelta rowDelta = table.newRowDelta();
+ result.deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.commit();
+
+ List<T> expectedRows = ImmutableList.of(
+ toRow(11, "aaa"),
+ toRow(12, "bbb"),
+ toRow(13, "ccc")
+ );
+ Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+ }
+
+ @Test
+ public void testClusteredPositionDeleteWriterOutOfOrderSpecsAndPartitions() throws IOException {
+ Assume.assumeFalse("ORC delete files are not supported", fileFormat == FileFormat.ORC);
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ table.updateSpec()
+ .addField(Expressions.bucket("data", 16))
+ .commit();
+
+ table.updateSpec()
+ .removeField(Expressions.bucket("data", 16))
+ .addField(Expressions.ref("data"))
+ .commit();
+
+ ClusteredPositionDeleteWriter<T> writer = new ClusteredPositionDeleteWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ PartitionSpec unpartitionedSpec = table.specs().get(0);
+ PartitionSpec bucketSpec = table.specs().get(1);
+ PartitionSpec identitySpec = table.specs().get(2);
+
+ writer.write(positionDelete("file-1.parquet", 0L, null), unpartitionedSpec, null);
+ writer.write(positionDelete("file-1.parquet", 1L, null), unpartitionedSpec, null);
+ writer.write(positionDelete("file-2.parquet", 0L, null), bucketSpec, partitionKey(bucketSpec, "bbb"));
+ writer.write(positionDelete("file-2.parquet", 1L, null), bucketSpec, partitionKey(bucketSpec, "bbb"));
+ writer.write(positionDelete("file-3.parquet", 0L, null), identitySpec, partitionKey(identitySpec, "ccc"));
+ writer.write(positionDelete("file-4.parquet", 0L, null), identitySpec, partitionKey(identitySpec, "ddd"));
+
+ AssertHelpers.assertThrows("Should fail to write out of order partitions",
+ IllegalStateException.class, "Encountered records that belong to already closed files",
+ () -> {
+ try {
+ PositionDelete<T> positionDelete = positionDelete("file-5.parquet", 1L, null);
+ writer.write(positionDelete, identitySpec, partitionKey(identitySpec, "ccc"));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+
+ AssertHelpers.assertThrows("Should fail to write out of order specs",
+ IllegalStateException.class, "Encountered records that belong to already closed files",
+ () -> {
+ try {
+ PositionDelete<T> positionDelete = positionDelete("file-1.parquet", 3L, null);
+ writer.write(positionDelete, unpartitionedSpec, null);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+
+ writer.close();
+ }
+
+ @Test
+ public void testFanoutDataWriterNoRecords() throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ FanoutDataWriter<T> writer = new FanoutDataWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ writer.close();
+ Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+
+ writer.close();
+ Assert.assertEquals("Must be no data files", 0, writer.result().dataFiles().size());
+ }
+
+ @Test
+ public void testFanoutDataWriterMultiplePartitions() throws IOException {
+ table.updateSpec()
+ .addField(Expressions.ref("data"))
+ .commit();
+
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+ FanoutDataWriter<T> writer = new FanoutDataWriter<>(
+ writerFactory, fileFactory, table.io(),
+ fileFormat, TARGET_FILE_SIZE);
+
+ PartitionSpec spec = table.spec();
+
+ writer.write(toRow(1, "aaa"), spec, partitionKey(spec, "aaa"));
+ writer.write(toRow(3, "bbb"), spec, partitionKey(spec, "bbb"));
+ writer.write(toRow(2, "aaa"), spec, partitionKey(spec, "aaa"));
+ writer.write(toRow(4, "bbb"), spec, partitionKey(spec, "bbb"));
+ writer.write(toRow(5, "ccc"), spec, partitionKey(spec, "ccc"));
+
+ writer.close();
+
+ DataWriteResult result = writer.result();
+ Assert.assertEquals("Must be 3 data files", 3, result.dataFiles().size());
+
+ RowDelta rowDelta = table.newRowDelta();
+ result.dataFiles().forEach(rowDelta::addRows);
+ rowDelta.commit();
+
+ List<T> expectedRows = ImmutableList.of(
+ toRow(1, "aaa"),
+ toRow(2, "aaa"),
+ toRow(3, "bbb"),
+ toRow(4, "bbb"),
+ toRow(5, "ccc")
+ );
+ Assert.assertEquals("Records should match", toSet(expectedRows), actualRowSet("*"));
+ }
+}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
index e11e0a7..a62ac3e 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestRollingFileWriters.java
@@ -23,16 +23,11 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.TableTestBase;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Assume;
@@ -42,7 +37,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
-public abstract class TestRollingFileWriters<T> extends TableTestBase {
+public abstract class TestRollingFileWriters<T> extends WriterTestBase<T> {
// TODO: add ORC once we support ORC rolling file writers
@@ -73,11 +68,6 @@ public abstract class TestRollingFileWriters<T> extends TableTestBase {
this.partitioned = partitioned;
}
- protected abstract FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
- Schema equalityDeleteRowSchema);
-
- protected abstract T toRow(Integer id, String data);
-
protected FileFormat format() {
return fileFormat;
}
@@ -91,7 +81,7 @@ public abstract class TestRollingFileWriters<T> extends TableTestBase {
if (partitioned) {
this.table = create(SCHEMA, SPEC);
- this.partition = initPartitionKey();
+ this.partition = partitionKey(table.spec(), PARTITION_VALUE);
} else {
this.table = create(SCHEMA, PartitionSpec.unpartitioned());
this.partition = null;
@@ -100,15 +90,6 @@ public abstract class TestRollingFileWriters<T> extends TableTestBase {
this.fileFactory = OutputFileFactory.builderFor(table, 1, 1).format(fileFormat).build();
}
- private PartitionKey initPartitionKey() {
- Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", PARTITION_VALUE));
-
- PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
- partitionKey.partition(record);
-
- return partitionKey;
- }
-
@Test
public void testRollingDataWriterNoRecords() throws IOException {
FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
@@ -221,7 +202,7 @@ public abstract class TestRollingFileWriters<T> extends TableTestBase {
List<PositionDelete<T>> deletes = Lists.newArrayListWithExpectedSize(4 * FILE_SIZE_CHECK_ROWS_DIVISOR);
for (int index = 0; index < 4 * FILE_SIZE_CHECK_ROWS_DIVISOR; index++) {
- deletes.add(new PositionDelete<T>().set("path/to/data/file-1.parquet", index, null));
+ deletes.add(positionDelete("path/to/data/file-1.parquet", index, null));
}
try (RollingPositionDeleteWriter<T> closeableWriter = writer) {
@@ -236,8 +217,4 @@ public abstract class TestRollingFileWriters<T> extends TableTestBase {
Assert.assertEquals(1, result.referencedDataFiles().size());
Assert.assertTrue(result.referencesDataFiles());
}
-
- private FileWriterFactory<T> newWriterFactory(Schema dataSchema) {
- return newWriterFactory(dataSchema, null, null);
- }
}
diff --git a/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java b/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java
new file mode 100644
index 0000000..661ab64
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/io/WriterTestBase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.StructLikeSet;
+
+public abstract class WriterTestBase<T> extends TableTestBase {
+
+ public WriterTestBase(int formatVersion) {
+ super(formatVersion);
+ }
+
+ protected abstract FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+ Schema equalityDeleteRowSchema,
+ Schema positionDeleteRowSchema);
+
+ protected FileWriterFactory<T> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
+ Schema equalityDeleteRowSchema) {
+ return newWriterFactory(dataSchema, equalityFieldIds, equalityDeleteRowSchema, null);
+ }
+
+ protected FileWriterFactory<T> newWriterFactory(Schema dataSchema, Schema positionDeleteRowSchema) {
+ return newWriterFactory(dataSchema, null, null, positionDeleteRowSchema);
+ }
+
+ protected FileWriterFactory<T> newWriterFactory(Schema dataSchema) {
+ return newWriterFactory(dataSchema, null, null, null);
+ }
+
+ protected abstract T toRow(Integer id, String data);
+
+ protected PartitionKey partitionKey(PartitionSpec spec, String value) {
+ Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", value));
+
+ PartitionKey partitionKey = new PartitionKey(spec, table.schema());
+ partitionKey.partition(record);
+
+ return partitionKey;
+ }
+
+ protected StructLikeSet actualRowSet(String... columns) throws IOException {
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ try (CloseableIterable<Record> reader = IcebergGenerics.read(table).select(columns).build()) {
+ reader.forEach(set::add);
+ }
+ return set;
+ }
+
+ protected DataFile writeData(FileWriterFactory<T> writerFactory, OutputFileFactory fileFactory,
+ List<T> rows, PartitionSpec spec, StructLike partitionKey) throws IOException {
+
+ EncryptedOutputFile file = fileFactory.newOutputFile(spec, partitionKey);
+ DataWriter<T> writer = writerFactory.newDataWriter(file, spec, partitionKey);
+
+ try (DataWriter<T> closeableWriter = writer) {
+ for (T row : rows) {
+ closeableWriter.write(row);
+ }
+ }
+
+ return writer.toDataFile();
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
similarity index 63%
copy from flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
copy to flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
index a3d62d5..934b5a0 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkPartitioningWriters.java
@@ -21,28 +21,34 @@ package org.apache.iceberg.flink.sink;
import java.util.List;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.io.FileWriterFactory;
-import org.apache.iceberg.io.TestRollingFileWriters;
+import org.apache.iceberg.io.TestPartitioningWriters;
import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
-public class TestFlinkRollingFileWriters extends TestRollingFileWriters<RowData> {
+public class TestFlinkPartitioningWriters<T> extends TestPartitioningWriters<RowData> {
- public TestFlinkRollingFileWriters(FileFormat fileFormat, boolean partitioned) {
- super(fileFormat, partitioned);
+ public TestFlinkPartitioningWriters(FileFormat fileFormat) {
+ super(fileFormat);
}
@Override
protected FileWriterFactory<RowData> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
- Schema equalityDeleteRowSchema) {
+ Schema equalityDeleteRowSchema,
+ Schema positionDeleteRowSchema) {
return FlinkFileWriterFactory.builderFor(table)
.dataSchema(table.schema())
.dataFileFormat(format())
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
+ .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
@@ -50,4 +56,15 @@ public class TestFlinkRollingFileWriters extends TestRollingFileWriters<RowData>
protected RowData toRow(Integer id, String data) {
return SimpleDataUtil.createRowData(id, data);
}
+
+ @Override
+ protected StructLikeSet toSet(Iterable<RowData> rows) {
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ RowType flinkType = FlinkSchemaUtil.convert(table.schema());
+ for (RowData row : rows) {
+ RowDataWrapper wrapper = new RowDataWrapper(flinkType, table.schema().asStruct());
+ set.add(wrapper.wrap(row));
+ }
+ return set;
+ }
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
index a3d62d5..9339e5a 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkRollingFileWriters.java
@@ -36,13 +36,15 @@ public class TestFlinkRollingFileWriters extends TestRollingFileWriters<RowData>
@Override
protected FileWriterFactory<RowData> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
- Schema equalityDeleteRowSchema) {
+ Schema equalityDeleteRowSchema,
+ Schema positionDeleteRowSchema) {
return FlinkFileWriterFactory.builderFor(table)
.dataSchema(table.schema())
.dataFileFormat(format())
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
+ .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
diff --git a/jmh.gradle b/jmh.gradle
index 543b07f..edcb39c 100644
--- a/jmh.gradle
+++ b/jmh.gradle
@@ -30,7 +30,7 @@ configure(jmhProjects) {
apply plugin: 'me.champeau.gradle.jmh'
jmh {
- jmhVersion = jmhVersion
+ jmhVersion = '1.32'
failOnError = true
forceGC = true
includeTests = true
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
new file mode 100644
index 0000000..a8521de
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.ClusteredDataWriter;
+import org.apache.iceberg.io.ClusteredEqualityDeleteWriter;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FanoutDataWriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.infra.Blackhole;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public abstract class WritersBenchmark extends IcebergSourceBenchmark {
+
+ private static final int NUM_ROWS = 2500000;
+ private static final long TARGET_FILE_SIZE_IN_BYTES = 50L * 1024 * 1024;
+
+ private static final Schema SCHEMA = new Schema(
+ required(1, "longCol", Types.LongType.get()),
+ required(2, "intCol", Types.IntegerType.get()),
+ required(3, "floatCol", Types.FloatType.get()),
+ optional(4, "doubleCol", Types.DoubleType.get()),
+ optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
+ optional(6, "timestampCol", Types.TimestampType.withZone()),
+ optional(7, "stringCol", Types.StringType.get())
+ );
+
+ private Iterable<InternalRow> rows;
+ private Iterable<InternalRow> positionDeleteRows;
+ private PartitionSpec unpartitionedSpec;
+ private PartitionSpec partitionedSpec;
+
+ protected abstract FileFormat fileFormat();
+
+ @Setup
+ public void setupBenchmark() {
+ setupSpark();
+
+ List<InternalRow> data = Lists.newArrayList(RandomData.generateSpark(SCHEMA, NUM_ROWS, 0L));
+ Transform<Integer, Integer> transform = Transforms.bucket(Types.IntegerType.get(), 32);
+ data.sort(Comparator.comparingInt(row -> transform.apply(row.getInt(1))));
+ this.rows = data;
+
+ this.positionDeleteRows = RandomData.generateSpark(DeleteSchemaUtil.pathPosSchema(), NUM_ROWS, 0L);
+
+ this.unpartitionedSpec = table().specs().get(0);
+ Preconditions.checkArgument(unpartitionedSpec.isUnpartitioned());
+ this.partitionedSpec = table().specs().get(1);
+ }
+
+ @TearDown
+ public void tearDownBenchmark() throws IOException {
+ tearDownSpark();
+ cleanupFiles();
+ }
+
+ @Override
+ protected Configuration initHadoopConf() {
+ return new Configuration();
+ }
+
+ @Override
+ protected final Table initTable() {
+ HadoopTables tables = new HadoopTables(hadoopConf());
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> properties = Maps.newHashMap();
+ Table table = tables.create(SCHEMA, spec, properties, newTableLocation());
+
+ // add a partitioned spec to the table
+ table.updateSpec()
+ .addField(Expressions.bucket("intCol", 32))
+ .commit();
+
+ return table;
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUnpartitionedClusteredDataWriter(Blackhole blackhole) throws IOException {
+ FileIO io = table().io();
+
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+ .dataFileFormat(fileFormat())
+ .dataSchema(table().schema())
+ .build();
+
+ ClusteredDataWriter<InternalRow> writer = new ClusteredDataWriter<>(
+ writerFactory, fileFactory, io,
+ fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+ try (ClusteredDataWriter<InternalRow> closeableWriter = writer) {
+ for (InternalRow row : rows) {
+ closeableWriter.write(row, unpartitionedSpec, null);
+ }
+ }
+
+ blackhole.consume(writer);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUnpartitionedLegacyDataWriter(Blackhole blackhole) throws IOException {
+ FileIO io = table().io();
+
+ OutputFileFactory fileFactory = newFileFactory();
+
+ Schema writeSchema = table().schema();
+ StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema);
+ SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType)
+ .spec(unpartitionedSpec)
+ .build();
+
+ TaskWriter<InternalRow> writer = new UnpartitionedWriter<>(
+ unpartitionedSpec, fileFormat(), appenders,
+ fileFactory, io, TARGET_FILE_SIZE_IN_BYTES);
+
+ try (TaskWriter<InternalRow> closableWriter = writer) {
+ for (InternalRow row : rows) {
+ closableWriter.write(row);
+ }
+ }
+
+ blackhole.consume(writer.complete());
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writePartitionedClusteredDataWriter(Blackhole blackhole) throws IOException {
+ FileIO io = table().io();
+
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+ .dataFileFormat(fileFormat())
+ .dataSchema(table().schema())
+ .build();
+
+ ClusteredDataWriter<InternalRow> writer = new ClusteredDataWriter<>(
+ writerFactory, fileFactory, io,
+ fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+ PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema());
+ StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
+ InternalRowWrapper internalRowWrapper = new InternalRowWrapper(dataSparkType);
+
+ try (ClusteredDataWriter<InternalRow> closeableWriter = writer) {
+ for (InternalRow row : rows) {
+ partitionKey.partition(internalRowWrapper.wrap(row));
+ closeableWriter.write(row, partitionedSpec, partitionKey);
+ }
+ }
+
+ blackhole.consume(writer);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writePartitionedLegacyDataWriter(Blackhole blackhole) throws IOException {
+ FileIO io = table().io();
+
+ OutputFileFactory fileFactory = newFileFactory();
+
+ Schema writeSchema = table().schema();
+ StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema);
+ SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType)
+ .spec(partitionedSpec)
+ .build();
+
+ TaskWriter<InternalRow> writer = new SparkPartitionedWriter(
+ partitionedSpec, fileFormat(), appenders,
+ fileFactory, io, TARGET_FILE_SIZE_IN_BYTES,
+ writeSchema, sparkWriteType);
+
+ try (TaskWriter<InternalRow> closableWriter = writer) {
+ for (InternalRow row : rows) {
+ closableWriter.write(row);
+ }
+ }
+
+ blackhole.consume(writer.complete());
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writePartitionedFanoutDataWriter(Blackhole blackhole) throws IOException {
+ FileIO io = table().io();
+
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+ .dataFileFormat(fileFormat())
+ .dataSchema(table().schema())
+ .build();
+
+ FanoutDataWriter<InternalRow> writer = new FanoutDataWriter<>(
+ writerFactory, fileFactory, io,
+ fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+ PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema());
+ StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
+ InternalRowWrapper internalRowWrapper = new InternalRowWrapper(dataSparkType);
+
+ try (FanoutDataWriter<InternalRow> closeableWriter = writer) {
+ for (InternalRow row : rows) {
+ partitionKey.partition(internalRowWrapper.wrap(row));
+ closeableWriter.write(row, partitionedSpec, partitionKey);
+ }
+ }
+
+ blackhole.consume(writer);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writePartitionedLegacyFanoutDataWriter(Blackhole blackhole) throws IOException {
+ FileIO io = table().io();
+
+ OutputFileFactory fileFactory = newFileFactory();
+
+ Schema writeSchema = table().schema();
+ StructType sparkWriteType = SparkSchemaUtil.convert(writeSchema);
+ SparkAppenderFactory appenders = SparkAppenderFactory.builderFor(table(), writeSchema, sparkWriteType)
+ .spec(partitionedSpec)
+ .build();
+
+ TaskWriter<InternalRow> writer = new SparkPartitionedFanoutWriter(
+ partitionedSpec, fileFormat(), appenders,
+ fileFactory, io, TARGET_FILE_SIZE_IN_BYTES,
+ writeSchema, sparkWriteType);
+
+ try (TaskWriter<InternalRow> closableWriter = writer) {
+ for (InternalRow row : rows) {
+ closableWriter.write(row);
+ }
+ }
+
+ blackhole.consume(writer.complete());
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writePartitionedClusteredEqualityDeleteWriter(Blackhole blackhole) throws IOException {
+ FileIO io = table().io();
+
+ int equalityFieldId = table().schema().findField("longCol").fieldId();
+
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+ .dataFileFormat(fileFormat())
+ .equalityDeleteRowSchema(table().schema())
+ .equalityFieldIds(new int[]{equalityFieldId})
+ .build();
+
+ ClusteredEqualityDeleteWriter<InternalRow> writer = new ClusteredEqualityDeleteWriter<>(
+ writerFactory, fileFactory, io,
+ fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+ PartitionKey partitionKey = new PartitionKey(partitionedSpec, table().schema());
+ StructType deleteSparkType = SparkSchemaUtil.convert(table().schema());
+ InternalRowWrapper internalRowWrapper = new InternalRowWrapper(deleteSparkType);
+
+ try (ClusteredEqualityDeleteWriter<InternalRow> closeableWriter = writer) {
+ for (InternalRow row : rows) {
+ partitionKey.partition(internalRowWrapper.wrap(row));
+ closeableWriter.write(row, partitionedSpec, partitionKey);
+ }
+ }
+
+ blackhole.consume(writer);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void writeUnpartitionedClusteredPositionDeleteWriter(Blackhole blackhole) throws IOException {
+ FileIO io = table().io();
+
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table())
+ .dataFileFormat(fileFormat())
+ .build();
+
+ ClusteredPositionDeleteWriter<InternalRow> writer = new ClusteredPositionDeleteWriter<>(
+ writerFactory, fileFactory, io,
+ fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+ PositionDelete<InternalRow> positionDelete = PositionDelete.create();
+ try (ClusteredPositionDeleteWriter<InternalRow> closeableWriter = writer) {
+ for (InternalRow row : positionDeleteRows) {
+ String path = row.getString(0);
+ long pos = row.getLong(1);
+ positionDelete.set(path, pos, null);
+ closeableWriter.write(positionDelete, unpartitionedSpec, null);
+ }
+ }
+
+ blackhole.consume(writer);
+ }
+
+ private OutputFileFactory newFileFactory() {
+ return OutputFileFactory.builderFor(table(), 1, 1)
+ .format(fileFormat())
+ .build();
+ }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java
new file mode 100644
index 0000000..5d970d0
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/avro/AvroWritersBenchmark.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.avro;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.spark.source.WritersBenchmark;
+
+/**
+ * A benchmark that evaluates the performance of various Iceberg writers for Avro data.
+ *
+ * To run this benchmark for either spark-2 or spark-3:
+ * <code>
+ * ./gradlew :iceberg-spark[2|3]:jmh
+ * -PjmhIncludeRegex=AvroWritersBenchmark
+ * -PjmhOutputPath=benchmark/avro-writers-benchmark-result.txt
+ * </code>
+ */
+public class AvroWritersBenchmark extends WritersBenchmark {
+
+ @Override
+ protected FileFormat fileFormat() {
+ return FileFormat.AVRO;
+ }
+}
diff --git a/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java
new file mode 100644
index 0000000..ff354ba
--- /dev/null
+++ b/spark/src/jmh/java/org/apache/iceberg/spark/source/parquet/ParquetWritersBenchmark.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source.parquet;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.spark.source.WritersBenchmark;
+
+/**
+ * A benchmark that evaluates the performance of various Iceberg writers for Parquet data.
+ *
+ * To run this benchmark for either spark-2 or spark-3:
+ * <code>
+ * ./gradlew :iceberg-spark[2|3]:jmh
+ * -PjmhIncludeRegex=ParquetWritersBenchmark
+ * -PjmhOutputPath=benchmark/parquet-writers-benchmark-result.txt
+ * </code>
+ */
+public class ParquetWritersBenchmark extends WritersBenchmark {
+
+ @Override
+ protected FileFormat fileFormat() {
+ return FileFormat.PARQUET;
+ }
+}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
similarity index 67%
copy from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
copy to spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
index 3ea2353..4d07cfb 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
@@ -23,27 +23,32 @@ import java.util.List;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.FileWriterFactory;
-import org.apache.iceberg.io.TestRollingFileWriters;
+import org.apache.iceberg.io.TestPartitioningWriters;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.StructLikeSet;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
-public class TestSparkRollingFileWriters extends TestRollingFileWriters<InternalRow> {
+public class TestSparkPartitioningWriters extends TestPartitioningWriters<InternalRow> {
- public TestSparkRollingFileWriters(FileFormat fileFormat, boolean partitioned) {
- super(fileFormat, partitioned);
+ public TestSparkPartitioningWriters(FileFormat fileFormat) {
+ super(fileFormat);
}
@Override
protected FileWriterFactory<InternalRow> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
- Schema equalityDeleteRowSchema) {
+ Schema equalityDeleteRowSchema,
+ Schema positionDeleteRowSchema) {
return SparkFileWriterFactory.builderFor(table)
.dataSchema(table.schema())
.dataFileFormat(format())
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
+ .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}
@@ -54,4 +59,15 @@ public class TestSparkRollingFileWriters extends TestRollingFileWriters<Internal
row.update(1, UTF8String.fromString(data));
return row;
}
+
+ @Override
+ protected StructLikeSet toSet(Iterable<InternalRow> rows) {
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ StructType sparkType = SparkSchemaUtil.convert(table.schema());
+ for (InternalRow row : rows) {
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ set.add(wrapper.wrap(row));
+ }
+ return set;
+ }
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
index 3ea2353..9023195 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkRollingFileWriters.java
@@ -37,13 +37,15 @@ public class TestSparkRollingFileWriters extends TestRollingFileWriters<Internal
@Override
protected FileWriterFactory<InternalRow> newWriterFactory(Schema dataSchema, List<Integer> equalityFieldIds,
- Schema equalityDeleteRowSchema) {
+ Schema equalityDeleteRowSchema,
+ Schema positionDeleteRowSchema) {
return SparkFileWriterFactory.builderFor(table)
.dataSchema(table.schema())
.dataFileFormat(format())
.deleteFileFormat(format())
.equalityFieldIds(ArrayUtil.toIntArray(equalityFieldIds))
.equalityDeleteRowSchema(equalityDeleteRowSchema)
+ .positionDeleteRowSchema(positionDeleteRowSchema)
.build();
}