You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/24 17:48:11 UTC

[iceberg] branch master updated: Spark: Migrate to new data writers in SparkWrite (#3171)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e757dc7  Spark: Migrate to new data writers in SparkWrite (#3171)
e757dc7 is described below

commit e757dc71011dbb12ca6fefdcd33268dd3be30a51
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Sep 24 10:47:58 2021 -0700

    Spark: Migrate to new data writers in SparkWrite (#3171)
---
 .../apache/iceberg/spark/source/SparkWrite.java    | 134 +++++++++++++++------
 1 file changed, 99 insertions(+), 35 deletions(-)

diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 4cc37c8..1c1fc43 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -29,11 +29,13 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.IsolationLevel;
 import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Schema;
@@ -45,9 +47,12 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.ClusteredDataWriter;
+import org.apache.iceberg.io.DataWriteResult;
+import org.apache.iceberg.io.FanoutDataWriter;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFileFactory;
-import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.io.PartitioningWriter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -538,68 +543,127 @@ class SparkWrite {
     @Override
     public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
       Table table = tableBroadcast.value();
-
-      OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId).format(format).build();
-      SparkAppenderFactory appenderFactory = SparkAppenderFactory.builderFor(table, writeSchema, dsSchema).build();
-
       PartitionSpec spec = table.spec();
       FileIO io = table.io();
 
+      OutputFileFactory fileFactory = OutputFileFactory.builderFor(table, partitionId, taskId)
+          .format(format)
+          .build();
+      SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table)
+          .dataFileFormat(format)
+          .dataSchema(writeSchema)
+          .dataSparkType(dsSchema)
+          .build();
+
       if (spec.isUnpartitioned()) {
-        return new Unpartitioned3Writer(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+        ClusteredDataWriter<InternalRow> dataWriter = new ClusteredDataWriter<>(
+            writerFactory, fileFactory, io,
+            format, targetFileSize);
+        return new UnpartitionedDataWriter(dataWriter, io, spec);
+
       } else if (partitionedFanoutEnabled) {
-        return new PartitionedFanout3Writer(
-            spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema);
+        FanoutDataWriter<InternalRow> dataWriter = new FanoutDataWriter<>(
+            writerFactory, fileFactory, io,
+            format, targetFileSize);
+        return new PartitionedDataWriter(dataWriter, io, spec, writeSchema, dsSchema);
+
       } else {
-        return new Partitioned3Writer(
-            spec, format, appenderFactory, fileFactory, io, targetFileSize, writeSchema, dsSchema);
+        ClusteredDataWriter<InternalRow> dataWriter = new ClusteredDataWriter<>(
+            writerFactory, fileFactory, io,
+            format, targetFileSize);
+        return new PartitionedDataWriter(dataWriter, io, spec, writeSchema, dsSchema);
       }
     }
   }
 
-  private static class Unpartitioned3Writer extends UnpartitionedWriter<InternalRow>
-      implements DataWriter<InternalRow> {
-    Unpartitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                         OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
-      super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+  private static <T extends ContentFile<T>> void deleteFiles(FileIO io, List<T> files) {
+    Tasks.foreach(files)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  private static class UnpartitionedDataWriter implements DataWriter<InternalRow> {
+    private final PartitioningWriter<InternalRow, DataWriteResult> delegate;
+    private final FileIO io;
+    private final PartitionSpec spec;
+
+    private UnpartitionedDataWriter(PartitioningWriter<InternalRow, DataWriteResult> delegate,
+                                    FileIO io, PartitionSpec spec) {
+      this.delegate = delegate;
+      this.io = io;
+      this.spec = spec;
+    }
+
+    @Override
+    public void write(InternalRow record) throws IOException {
+      delegate.write(record, spec, null);
     }
 
     @Override
     public WriterCommitMessage commit() throws IOException {
-      this.close();
+      close();
 
-      return new TaskCommit(dataFiles());
+      DataWriteResult result = delegate.result();
+      return new TaskCommit(result.dataFiles().toArray(new DataFile[0]));
     }
-  }
 
-  private static class Partitioned3Writer extends SparkPartitionedWriter implements DataWriter<InternalRow> {
-    Partitioned3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                       OutputFileFactory fileFactory, FileIO io, long targetFileSize,
-                       Schema schema, StructType sparkSchema) {
-      super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema);
+    @Override
+    public void abort() throws IOException {
+      close();
+
+      DataWriteResult result = delegate.result();
+      deleteFiles(io, result.dataFiles());
     }
 
     @Override
-    public WriterCommitMessage commit() throws IOException {
-      this.close();
-
-      return new TaskCommit(dataFiles());
+    public void close() throws IOException {
+      delegate.close();
     }
   }
 
-  private static class PartitionedFanout3Writer extends SparkPartitionedFanoutWriter
-      implements DataWriter<InternalRow> {
-    PartitionedFanout3Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
-                             OutputFileFactory fileFactory, FileIO io, long targetFileSize,
-                             Schema schema, StructType sparkSchema) {
-      super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, sparkSchema);
+  private static class PartitionedDataWriter implements DataWriter<InternalRow> {
+    private final PartitioningWriter<InternalRow, DataWriteResult> delegate;
+    private final FileIO io;
+    private final PartitionSpec spec;
+    private final PartitionKey partitionKey;
+    private final InternalRowWrapper internalRowWrapper;
+
+    private PartitionedDataWriter(PartitioningWriter<InternalRow, DataWriteResult> delegate,
+                                  FileIO io, PartitionSpec spec, Schema dataSchema,
+                                  StructType dataSparkType) {
+      this.delegate = delegate;
+      this.io = io;
+      this.spec = spec;
+      this.partitionKey = new PartitionKey(spec, dataSchema);
+      this.internalRowWrapper = new InternalRowWrapper(dataSparkType);
+    }
+
+    @Override
+    public void write(InternalRow row) throws IOException {
+      partitionKey.partition(internalRowWrapper.wrap(row));
+      delegate.write(row, spec, partitionKey);
     }
 
     @Override
     public WriterCommitMessage commit() throws IOException {
-      this.close();
+      close();
 
-      return new TaskCommit(dataFiles());
+      DataWriteResult result = delegate.result();
+      return new TaskCommit(result.dataFiles().toArray(new DataFile[0]));
+    }
+
+    @Override
+    public void abort() throws IOException {
+      close();
+
+      DataWriteResult result = delegate.result();
+      deleteFiles(io, result.dataFiles());
+    }
+
+    @Override
+    public void close() throws IOException {
+      delegate.close();
     }
   }
 }