You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/24 17:48:11 UTC
[iceberg] branch master updated: Spark: Migrate to new data writers
in SparkWrite (#3171)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e757dc7 Spark: Migrate to new data writers in SparkWrite (#3171)
e757dc7 is described below
commit e757dc71011dbb12ca6fefdcd33268dd3be30a51
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Sep 24 10:47:58 2021 -0700
Spark: Migrate to new data writers in SparkWrite (#3171)
---
.../apache/iceberg/spark/source/SparkWrite.java | 134 +++++++++++++++------
1 file changed, 99 insertions(+), 35 deletions(-)
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 4cc37c8..1c1fc43 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -29,11 +29,13 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IsolationLevel;
import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Schema;
@@ -45,9 +47,12 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.ClusteredDataWriter;
+import org.apache.iceberg.io.DataWriteResult;
+import org.apache.iceberg.io.FanoutDataWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
-import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -538,68 +543,127 @@ class SparkWrite {
@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
Table table = tableBroadcast.value();
-
- OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(format).build();
- SparkAppenderFactory appenderFactory = SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build();
-
PartitionSpec spec = table.spec();
FileIO io = table.io();
+ OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId)
+ .format(format)
+ .build();
+ SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table)
+ .dataFileFormat(format)
+ .dataSchema(writeSchema)
+ .dataSparkType(dsSchema)
+ .build();
+
if (spec.isUnpartitioned()) {
- return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ ClusteredDataWriter<InternalRow> dataWriter = new ClusteredDataWriter<>(
+ writerFactory, fileFactory, io,
+ format, targetFileSize);
+ return new UnpartitionedDataWriter(dataWriter, io, spec);
+
} else if (partitionedFanoutEnabled) {
- return new PartitionedFanout3Writer(
- spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema);
+ FanoutDataWriter<InternalRow> dataWriter = new FanoutDataWriter<>(
+ writerFactory, fileFactory, io,
+ format, targetFileSize);
+ return new PartitionedDataWriter(dataWriter, io, spec, writeSchema, dsSchema);
+
} else {
- return new Partitioned3Writer(
- spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema);
+ ClusteredDataWriter<InternalRow> dataWriter = new ClusteredDataWriter<>(
+ writerFactory, fileFactory, io,
+ format, targetFileSize);
+ return new PartitionedDataWriter(dataWriter, io, spec, writeSchema, dsSchema);
}
}
}
- private static class Unpartitioned3Writer extends UnpartitionedWriter<InternalRow>
- implements DataWriter<InternalRow> {
- Unpartitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
- OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
- super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ private static <T extends ContentFile<T>> void deleteFiles(FileIO io, List<T> files) {
+ Tasks.foreach(files)
+ .throwFailureWhenFinished()
+ .noRetry()
+ .run(file -> io.deleteFile(file.path().toString()));
+ }
+
+ private static class UnpartitionedDataWriter implements DataWriter<InternalRow> {
+ private final PartitioningWriter<InternalRow, DataWriteResult> delegate;
+ private final FileIO io;
+ private final PartitionSpec spec;
+
+ private UnpartitionedDataWriter(PartitioningWriter<InternalRow, DataWriteResult> delegate,
+ FileIO io, PartitionSpec spec) {
+ this.delegate = delegate;
+ this.io = io;
+ this.spec = spec;
+ }
+
+ @Override
+ public void write(InternalRow record) throws IOException {
+ delegate.write(record, spec, null);
}
@Override
public WriterCommitMessage commit() throws IOException {
- this.close();
+ close();
- return new TaskCommit(dataFiles());
+ DataWriteResult result = delegate.result();
+ return new TaskCommit(result.dataFiles().toArray(new DataFile[0]));
}
- }
- private static class Partitioned3Writer extends SparkPartitionedWriter implements DataWriter<InternalRow> {
- Partitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
- OutputFileFactory fileFactory, FileIO io, long targetFileSize,
- Schema schema, StructType sparkSchema) {
- super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema);
+ @Override
+ public void abort() throws IOException {
+ close();
+
+ DataWriteResult result = delegate.result();
+ deleteFiles(io, result.dataFiles());
}
@Override
- public WriterCommitMessage commit() throws IOException {
- this.close();
-
- return new TaskCommit(dataFiles());
+ public void close() throws IOException {
+ delegate.close();
}
}
- private static class PartitionedFanout3Writer extends SparkPartitionedFanoutWriter
- implements DataWriter<InternalRow> {
- PartitionedFanout3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
- OutputFileFactory fileFactory, FileIO io, long targetFileSize,
- Schema schema, StructType sparkSchema) {
- super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema);
+ private static class PartitionedDataWriter implements DataWriter<InternalRow> {
+ private final PartitioningWriter<InternalRow, DataWriteResult> delegate;
+ private final FileIO io;
+ private final PartitionSpec spec;
+ private final PartitionKey partitionKey;
+ private final InternalRowWrapper internalRowWrapper;
+
+ private PartitionedDataWriter(PartitioningWriter<InternalRow, DataWriteResult> delegate,
+ FileIO io, PartitionSpec spec, Schema dataSchema,
+ StructType dataSparkType) {
+ this.delegate = delegate;
+ this.io = io;
+ this.spec = spec;
+ this.partitionKey = new PartitionKey(spec, dataSchema);
+ this.internalRowWrapper = new InternalRowWrapper(dataSparkType);
+ }
+
+ @Override
+ public void write(InternalRow row) throws IOException {
+ partitionKey.partition(internalRowWrapper.wrap(row));
+ delegate.write(row, spec, partitionKey);
}
@Override
public WriterCommitMessage commit() throws IOException {
- this.close();
+ close();
- return new TaskCommit(dataFiles());
+ DataWriteResult result = delegate.result();
+ return new TaskCommit(result.dataFiles().toArray(new DataFile[0]));
+ }
+
+ @Override
+ public void abort() throws IOException {
+ close();
+
+ DataWriteResult result = delegate.result();
+ deleteFiles(io, result.dataFiles());
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
}
}
}