You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/06/20 01:02:33 UTC

[iceberg] branch master updated: Spark: Move classes that depend on 2.x DSv2 to spark2 (#1122)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 845de7b  Spark: Move classes that depend on 2.x DSv2 to spark2 (#1122)
845de7b is described below

commit 845de7bfe78bc8ba3480aad219fcef9f71477023
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Fri Jun 19 18:02:23 2020 -0700

    Spark: Move classes that depend on 2.x DSv2 to spark2 (#1122)
---
 build.gradle                                       |  13 +
 .../iceberg/actions/RewriteDataFilesAction.java    |   3 +-
 .../iceberg/spark/source/BaseDataReader.java       |  10 +-
 .../apache/iceberg/spark/source/BaseWriter.java    | 143 ++++++
 .../iceberg/spark/source/OutputFileFactory.java    |  78 +++
 .../iceberg/spark/source/PartitionedWriter.java    |  68 +++
 .../iceberg/spark/source/RowDataRewriter.java      |  59 ++-
 .../iceberg/spark/source/SparkAppenderFactory.java |  86 ++++
 .../spark/source/{Stats.java => TaskResult.java}   |  33 +-
 .../{Stats.java => UnpartitionedWriter.java}       |  27 +-
 .../org/apache/iceberg/spark/source/Writer.java    | 544 ---------------------
 .../org/apache/iceberg/spark/SparkFilters.java     |   0
 .../apache/iceberg/spark/source/IcebergSource.java |   0
 .../org/apache/iceberg/spark/source/Reader.java    |  49 +-
 .../org/apache/iceberg/spark/source/Stats.java     |   0
 .../iceberg/spark/source/StreamingWriter.java      |   0
 .../org/apache/iceberg/spark/source/Writer.java    | 282 +++++++++++
 ...org.apache.spark.sql.sources.DataSourceRegister |   0
 .../org/apache/iceberg/spark/SparkTableUtil.scala  |   0
 .../actions/TestRemoveOrphanFilesAction.java       |   0
 .../actions/TestRewriteDataFilesAction.java        |   0
 .../actions/TestRewriteManifestsAction.java        |   0
 .../apache/iceberg/examples/ConcurrencyTest.java   |   0
 .../java/org/apache/iceberg/examples/README.md     |   0
 .../iceberg/examples/ReadAndWriteTablesTest.java   |   0
 .../iceberg/examples/SchemaEvolutionTest.java      |   0
 .../org/apache/iceberg/examples/SimpleRecord.java  |   0
 .../examples/SnapshotFunctionalityTest.java        |   0
 .../apache/iceberg/spark/TestSparkDataFile.java    |   0
 .../apache/iceberg/spark/source/LogMessage.java    |   0
 .../apache/iceberg/spark/source/SimpleRecord.java  |   0
 .../apache/iceberg/spark/source/TestAvroScan.java  |   0
 .../iceberg/spark/source/TestDataFrameWrites.java  |   0
 .../spark/source/TestDataSourceOptions.java        |   0
 .../iceberg/spark/source/TestFilteredScan.java     |   0
 .../spark/source/TestForwardCompatibility.java     |   0
 .../iceberg/spark/source/TestIcebergSource.java    |   0
 .../source/TestIcebergSourceHadoopTables.java      |   0
 .../spark/source/TestIcebergSourceHiveTables.java  |   0
 .../spark/source/TestIcebergSourceTablesBase.java  |   0
 .../spark/source/TestIdentityPartitionData.java    |   0
 .../iceberg/spark/source/TestParquetScan.java      |   0
 .../iceberg/spark/source/TestPartitionValues.java  |   0
 .../iceberg/spark/source/TestReadProjection.java   |   0
 .../spark/source/TestSnapshotSelection.java        |   0
 .../iceberg/spark/source/TestSparkDataWrite.java   |   0
 .../spark/source/TestSparkReadProjection.java      |   0
 .../iceberg/spark/source/TestSparkSchema.java      |   0
 .../iceberg/spark/source/TestSparkTableUtil.java   |   0
 .../TestSparkTableUtilWithInMemoryCatalog.java     |   0
 .../spark/source/TestStructuredStreaming.java      |   0
 .../apache/iceberg/spark/source/TestTables.java    |   0
 .../spark/source/TestWriteMetricsConfig.java       |   0
 .../src/test/resources/data/books.json             |   0
 .../src/test/resources/data/new-books.json         |   0
 55 files changed, 753 insertions(+), 642 deletions(-)

diff --git a/build.gradle b/build.gradle
index a9da1a7..5510405 100644
--- a/build.gradle
+++ b/build.gradle
@@ -519,6 +519,18 @@ project(':iceberg-spark2') {
     testCompile project(path: ':iceberg-hive', configuration: 'testArtifacts')
     testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
   }
+
+  test {
+    // For vectorized reads
+    // Allow unsafe memory access to avoid the costly check arrow does to check if index is within bounds
+    systemProperty("arrow.enable_unsafe_memory_access", "true")
+    // Disable expensive null check for every get(index) call.
+    // Iceberg manages nullability checks itself instead of relying on arrow.
+    systemProperty("arrow.enable_null_check_for_get", "false")
+
+    // Vectorized reads need more memory
+    maxHeapSize '2500m'
+  }
 }
 
 project(':iceberg-spark3') {
@@ -548,6 +560,7 @@ project(':iceberg-spark3') {
     testCompile project(path: ':iceberg-hive', configuration: 'testArtifacts')
     testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
   }
+
   test {
     // For vectorized reads
     // Allow unsafe memory access to avoid the costly check arrow does to check if index is within bounds
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
index e1a7584..0c121a9 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -228,8 +228,7 @@ public class RewriteDataFilesAction
     Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
     Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager);
 
-    RowDataRewriter rowDataRewriter =
-        new RowDataRewriter(table, spec, caseSensitive, io, encryption, targetSizeInBytes);
+    RowDataRewriter rowDataRewriter = new RowDataRewriter(table, spec, caseSensitive, io, encryption);
 
     List<DataFile> addedDataFiles = rowDataRewriter.rewriteDataForTasks(taskRDD);
     List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index 9ab8bce..93e03aa 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.spark.source;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
@@ -33,15 +34,13 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.spark.rdd.InputFileBlockHolder;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 
 /**
- * Base class of readers of type {@link InputPartitionReader} to read data as objects of type @param &lt;T&gt;
+ * Base class of Spark readers.
  *
  * @param <T> is the Java class returned by this reader whose objects contain one or more rows.
  */
-@SuppressWarnings("checkstyle:VisibilityModifier")
-abstract class BaseDataReader<T> implements InputPartitionReader<T> {
+abstract class BaseDataReader<T> implements Closeable {
   private final Iterator<FileScanTask> tasks;
   private final FileIO fileIo;
   private final Map<String, InputFile> inputFiles;
@@ -64,7 +63,6 @@ abstract class BaseDataReader<T> implements InputPartitionReader<T> {
     this.currentIterator = CloseableIterator.empty();
   }
 
-  @Override
   public boolean next() throws IOException {
     while (true) {
       if (currentIterator.hasNext()) {
@@ -79,14 +77,12 @@ abstract class BaseDataReader<T> implements InputPartitionReader<T> {
     }
   }
 
-  @Override
   public T get() {
     return current;
   }
 
   abstract CloseableIterator<T> open(FileScanTask task);
 
-  @Override
   public void close() throws IOException {
     InputFileBlockHolder.unset();
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java
new file mode 100644
index 0000000..f3a1030
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+abstract class BaseWriter implements Closeable {
+  protected static final int ROWS_DIVISOR = 1000;
+
+  private final List<DataFile> completedFiles = Lists.newArrayList();
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final SparkAppenderFactory appenderFactory;
+  private final OutputFileFactory fileFactory;
+  private final FileIO io;
+  private final long targetFileSize;
+  private PartitionKey currentKey = null;
+  private FileAppender<InternalRow> currentAppender = null;
+  private EncryptedOutputFile currentFile = null;
+  private long currentRows = 0;
+
+  BaseWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
+             OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    this.spec = spec;
+    this.format = format;
+    this.appenderFactory = appenderFactory;
+    this.fileFactory = fileFactory;
+    this.io = io;
+    this.targetFileSize = targetFileSize;
+  }
+
+  public abstract void write(InternalRow row) throws IOException;
+
+  public void writeInternal(InternalRow row)  throws IOException {
+    //TODO: ORC file now not support target file size before closed
+    if  (!format.equals(FileFormat.ORC) &&
+        currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) {
+      closeCurrent();
+      openCurrent();
+    }
+
+    currentAppender.add(row);
+    currentRows++;
+  }
+
+  public TaskResult complete() throws IOException {
+    closeCurrent();
+
+    return new TaskResult(completedFiles);
+  }
+
+  public void abort() throws IOException {
+    closeCurrent();
+
+    // clean up files created by this writer
+    Tasks.foreach(completedFiles)
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  @Override
+  public void close() throws IOException {
+    closeCurrent();
+  }
+
+  protected void openCurrent() {
+    if (spec.fields().size() == 0) {
+      // unpartitioned
+      currentFile = fileFactory.newOutputFile();
+    } else {
+      // partitioned
+      currentFile = fileFactory.newOutputFile(currentKey);
+    }
+    currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
+    currentRows = 0;
+  }
+
+  protected void closeCurrent() throws IOException {
+    if (currentAppender != null) {
+      currentAppender.close();
+      // metrics are only valid after the appender is closed
+      Metrics metrics = currentAppender.metrics();
+      long fileSizeInBytes = currentAppender.length();
+      List<Long> splitOffsets = currentAppender.splitOffsets();
+      this.currentAppender = null;
+
+      if (metrics.recordCount() == 0L) {
+        io.deleteFile(currentFile.encryptingOutputFile());
+      } else {
+        DataFile dataFile = DataFiles.builder(spec)
+            .withEncryptionKeyMetadata(currentFile.keyMetadata())
+            .withPath(currentFile.encryptingOutputFile().location())
+            .withFileSizeInBytes(fileSizeInBytes)
+            .withPartition(spec.fields().size() == 0 ? null : currentKey) // set null if unpartitioned
+            .withMetrics(metrics)
+            .withSplitOffsets(splitOffsets)
+            .build();
+        completedFiles.add(dataFile);
+      }
+
+      this.currentFile = null;
+    }
+  }
+
+  protected PartitionKey getCurrentKey() {
+    return currentKey;
+  }
+
+  protected void setCurrentKey(PartitionKey currentKey) {
+    this.currentKey = currentKey;
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java
new file mode 100644
index 0000000..1daf889
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFile;
+
+class OutputFileFactory {
+  private final PartitionSpec spec;
+  private final FileFormat format;
+  private final LocationProvider locations;
+  private final FileIO io;
+  private final EncryptionManager encryptionManager;
+  private final int partitionId;
+  private final long taskId;
+  // The purpose of this uuid is to be able to know from two paths that they were written by the same operation.
+  // That's useful, for example, if a Spark job dies and leaves files in the file system, you can identify them all
+  // with a recursive listing and grep.
+  private final String uuid = UUID.randomUUID().toString();
+  private final AtomicInteger fileCount = new AtomicInteger(0);
+
+  OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io,
+                    EncryptionManager encryptionManager, int partitionId, long taskId) {
+    this.spec = spec;
+    this.format = format;
+    this.locations = locations;
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.partitionId = partitionId;
+    this.taskId = taskId;
+  }
+
+  private String generateFilename() {
+    return format.addExtension(
+        String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount.incrementAndGet()));
+  }
+
+  /**
+   * Generates EncryptedOutputFile for UnpartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile() {
+    OutputFile file = io.newOutputFile(locations.newDataLocation(generateFilename()));
+    return encryptionManager.encrypt(file);
+  }
+
+  /**
+   * Generates EncryptedOutputFile for PartitionedWriter.
+   */
+  public EncryptedOutputFile newOutputFile(PartitionKey key) {
+    String newDataLocation = locations.newDataLocation(spec, key, generateFilename());
+    OutputFile rawOutputFile = io.newOutputFile(newDataLocation);
+    return encryptionManager.encrypt(rawOutputFile);
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java
new file mode 100644
index 0000000..7cc0c9f
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PartitionedWriter extends BaseWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(PartitionedWriter.class);
+
+  private final PartitionKey key;
+  private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
+
+  PartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
+                    OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+    this.key = new PartitionKey(spec, writeSchema);
+  }
+
+  @Override
+  public void write(InternalRow row) throws IOException {
+    key.partition(row);
+
+    PartitionKey currentKey = getCurrentKey();
+    if (!key.equals(currentKey)) {
+      closeCurrent();
+      completedPartitions.add(currentKey);
+
+      if (completedPartitions.contains(key)) {
+        // if rows are not correctly grouped, detect and fail the write
+        PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null);
+        LOG.warn("Duplicate key: {} == {}", existingKey, key);
+        throw new IllegalStateException("Already closed files for partition: " + key.toPath());
+      }
+
+      setCurrentKey(key.copy());
+      openCurrent();
+    }
+
+    writeInternal(row);
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
index 384a95b..2837dbc 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -23,22 +23,23 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,58 +49,70 @@ public class RowDataRewriter implements Serializable {
 
   private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class);
 
-  private final Broadcast<FileIO> fileIO;
+  private final Schema schema;
+  private final PartitionSpec spec;
+  private final Map<String, String> properties;
+  private final FileFormat format;
+  private final Broadcast<FileIO> io;
   private final Broadcast<EncryptionManager> encryptionManager;
-  private final String tableSchema;
+  private final LocationProvider locations;
   private final String nameMapping;
-  private final Writer.WriterFactory writerFactory;
   private final boolean caseSensitive;
 
   public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
-                         Broadcast<FileIO> fileIO, Broadcast<EncryptionManager> encryptionManager,
-                         long targetDataFileSizeInBytes) {
-    this.fileIO = fileIO;
+                         Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
+    this.schema = table.schema();
+    this.spec = spec;
+    this.locations = table.locationProvider();
+    this.properties = table.properties();
+    this.io = io;
     this.encryptionManager = encryptionManager;
 
     this.caseSensitive = caseSensitive;
-    this.tableSchema = SchemaParser.toJson(table.schema());
     this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
 
     String formatString = table.properties().getOrDefault(
         TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
-    FileFormat fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
-    this.writerFactory = new Writer.WriterFactory(spec, fileFormat, table.locationProvider(), table.properties(),
-        fileIO, encryptionManager, targetDataFileSizeInBytes, table.schema(), SparkSchemaUtil.convert(table.schema()));
+    this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
   }
 
   public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) {
-    JavaRDD<Writer.TaskCommit> taskCommitRDD = taskRDD.map(this::rewriteDataForTask);
+    JavaRDD<TaskResult> taskCommitRDD = taskRDD.map(this::rewriteDataForTask);
 
     return taskCommitRDD.collect().stream()
         .flatMap(taskCommit -> Arrays.stream(taskCommit.files()))
         .collect(Collectors.toList());
   }
 
-  private Writer.TaskCommit rewriteDataForTask(CombinedScanTask task) throws Exception {
+  private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception {
     TaskContext context = TaskContext.get();
-
-    RowDataReader dataReader = new RowDataReader(task, SchemaParser.fromJson(tableSchema),
-        SchemaParser.fromJson(tableSchema), nameMapping, fileIO.value(),
-        encryptionManager.value(), caseSensitive);
-
     int partitionId = context.partitionId();
     long taskId = context.taskAttemptId();
-    DataWriter<InternalRow> dataWriter = writerFactory.createDataWriter(partitionId, taskId, 0);
+
+    RowDataReader dataReader = new RowDataReader(
+        task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
+
+    SparkAppenderFactory appenderFactory = new SparkAppenderFactory(
+        properties, schema, SparkSchemaUtil.convert(schema));
+    OutputFileFactory fileFactory = new OutputFileFactory(
+        spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
+
+    BaseWriter writer;
+    if (spec.fields().isEmpty()) {
+      writer = new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE);
+    } else {
+      writer = new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema);
+    }
 
     try {
       while (dataReader.next()) {
         InternalRow row = dataReader.get();
-        dataWriter.write(row);
+        writer.write(row);
       }
 
       dataReader.close();
       dataReader = null;
-      return (Writer.TaskCommit) dataWriter.commit();
+      return writer.complete();
 
     } catch (Throwable originalThrowable) {
       try {
@@ -111,7 +124,7 @@ public class RowDataRewriter implements Serializable {
         if (dataReader != null) {
           dataReader.close();
         }
-        dataWriter.abort();
+        writer.abort();
         LOG.error("Aborted commit for partition {} (task {}, attempt {}, stage {}.{})",
             partitionId, taskId, context.taskAttemptId(), context.stageId(), context.stageAttemptNumber());
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
new file mode 100644
index 0000000..e29c6eb
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.data.SparkAvroWriter;
+import org.apache.iceberg.spark.data.SparkOrcWriter;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+class SparkAppenderFactory {
+  private final Map<String, String> properties;
+  private final Schema writeSchema;
+  private final StructType dsSchema;
+
+  SparkAppenderFactory(Map<String, String> properties, Schema writeSchema, StructType dsSchema) {
+    this.properties = properties;
+    this.writeSchema = writeSchema;
+    this.dsSchema = dsSchema;
+  }
+
+  public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFormat) {
+    MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
+    try {
+      switch (fileFormat) {
+        case PARQUET:
+          return Parquet.write(file)
+              .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType))
+              .setAll(properties)
+              .metricsConfig(metricsConfig)
+              .schema(writeSchema)
+              .overwrite()
+              .build();
+
+        case AVRO:
+          return Avro.write(file)
+              .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema))
+              .setAll(properties)
+              .schema(writeSchema)
+              .overwrite()
+              .build();
+
+        case ORC:
+          return ORC.write(file)
+              .createWriterFunc(SparkOrcWriter::new)
+              .setAll(properties)
+              .schema(writeSchema)
+              .overwrite()
+              .build();
+
+        default:
+          throw new UnsupportedOperationException("Cannot write unknown format: " + fileFormat);
+      }
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java b/spark/src/main/java/org/apache/iceberg/spark/source/TaskResult.java
similarity index 63%
copy from spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
copy to spark/src/main/java/org/apache/iceberg/spark/source/TaskResult.java
index 76119c1..1a0b8fc 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/TaskResult.java
@@ -19,25 +19,30 @@
 
 package org.apache.iceberg.spark.source;
 
-import java.util.OptionalLong;
-import org.apache.spark.sql.sources.v2.reader.Statistics;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.iceberg.DataFile;
 
-class Stats implements Statistics {
-  private final OptionalLong sizeInBytes;
-  private final OptionalLong numRows;
+class TaskResult implements Serializable {
+  private final DataFile[] files;
 
-  Stats(long sizeInBytes, long numRows) {
-    this.sizeInBytes = OptionalLong.of(sizeInBytes);
-    this.numRows = OptionalLong.of(numRows);
+  TaskResult() {
+    this.files = new DataFile[0];
   }
 
-  @Override
-  public OptionalLong sizeInBytes() {
-    return sizeInBytes;
+  TaskResult(DataFile file) {
+    this.files = new DataFile[] { file };
   }
 
-  @Override
-  public OptionalLong numRows() {
-    return numRows;
+  TaskResult(List<DataFile> files) {
+    this.files = files.toArray(new DataFile[files.size()]);
+  }
+
+  TaskResult(DataFile[] files) {
+    this.files = files;
+  }
+
+  DataFile[] files() {
+    return files;
   }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java b/spark/src/main/java/org/apache/iceberg/spark/source/UnpartitionedWriter.java
similarity index 59%
copy from spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
copy to spark/src/main/java/org/apache/iceberg/spark/source/UnpartitionedWriter.java
index 76119c1..5692b6a 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/UnpartitionedWriter.java
@@ -19,25 +19,22 @@
 
 package org.apache.iceberg.spark.source;
 
-import java.util.OptionalLong;
-import org.apache.spark.sql.sources.v2.reader.Statistics;
+import java.io.IOException;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.FileIO;
+import org.apache.spark.sql.catalyst.InternalRow;
 
-class Stats implements Statistics {
-  private final OptionalLong sizeInBytes;
-  private final OptionalLong numRows;
+class UnpartitionedWriter extends BaseWriter {
+  UnpartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
+                      OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
 
-  Stats(long sizeInBytes, long numRows) {
-    this.sizeInBytes = OptionalLong.of(sizeInBytes);
-    this.numRows = OptionalLong.of(numRows);
+    openCurrent();
   }
 
   @Override
-  public OptionalLong sizeInBytes() {
-    return sizeInBytes;
-  }
-
-  @Override
-  public OptionalLong numRows() {
-    return numRows;
+  public void write(InternalRow row) throws IOException {
+    writeInternal(row);
   }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
deleted file mode 100644
index ffe16cf..0000000
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.source;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Metrics;
-import org.apache.iceberg.MetricsConfig;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.ReplacePartitions;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.SnapshotSummary;
-import org.apache.iceberg.SnapshotUpdate;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
-import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.data.SparkAvroWriter;
-import org.apache.iceberg.spark.data.SparkOrcWriter;
-import org.apache.iceberg.spark.data.SparkParquetWriters;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
-import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
-import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
-import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-
-// TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
-class Writer implements DataSourceWriter {
-  private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
-
-  private final Table table;
-  private final FileFormat format;
-  private final Broadcast<FileIO> io;
-  private final Broadcast<EncryptionManager> encryptionManager;
-  private final boolean replacePartitions;
-  private final String applicationId;
-  private final String wapId;
-  private final long targetFileSize;
-  private final Schema writeSchema;
-  private final StructType dsSchema;
-
-  Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-         DataSourceOptions options, boolean replacePartitions, String applicationId, Schema writeSchema,
-         StructType dsSchema) {
-    this(table, io, encryptionManager, options, replacePartitions, applicationId, null, writeSchema, dsSchema);
-  }
-
-  Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
-         DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId,
-         Schema writeSchema, StructType dsSchema) {
-    this.table = table;
-    this.format = getFileFormat(table.properties(), options);
-    this.io = io;
-    this.encryptionManager = encryptionManager;
-    this.replacePartitions = replacePartitions;
-    this.applicationId = applicationId;
-    this.wapId = wapId;
-    this.writeSchema = writeSchema;
-    this.dsSchema = dsSchema;
-
-    long tableTargetFileSize = PropertyUtil.propertyAsLong(
-        table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize);
-  }
-
-  private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
-    Optional<String> formatOption = options.get("write-format");
-    String formatString = formatOption
-        .orElse(tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
-    return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
-  }
-
-  private boolean isWapTable() {
-    return Boolean.parseBoolean(table.properties().getOrDefault(
-        TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
-  }
-
-  @Override
-  public DataWriterFactory<InternalRow> createWriterFactory() {
-    return new WriterFactory(
-        table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
-        writeSchema, dsSchema);
-  }
-
-  @Override
-  public void commit(WriterCommitMessage[] messages) {
-    if (replacePartitions) {
-      replacePartitions(messages);
-    } else {
-      append(messages);
-    }
-  }
-
-  protected void commitOperation(SnapshotUpdate<?> operation, int numFiles, String description) {
-    LOG.info("Committing {} with {} files to table {}", description, numFiles, table);
-    if (applicationId != null) {
-      operation.set("spark.app.id", applicationId);
-    }
-
-    if (isWapTable() && wapId != null) {
-      // write-audit-publish is enabled for this table and job
-      // stage the changes without changing the current snapshot
-      operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId);
-      operation.stageOnly();
-    }
-
-    long start = System.currentTimeMillis();
-    operation.commit(); // abort is automatically called if this fails
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
-  }
-
-  private void append(WriterCommitMessage[] messages) {
-    AppendFiles append = table.newAppend();
-
-    int numFiles = 0;
-    for (DataFile file : files(messages)) {
-      numFiles += 1;
-      append.appendFile(file);
-    }
-
-    commitOperation(append, numFiles, "append");
-  }
-
-  private void replacePartitions(WriterCommitMessage[] messages) {
-    ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
-
-    int numFiles = 0;
-    for (DataFile file : files(messages)) {
-      numFiles += 1;
-      dynamicOverwrite.addFile(file);
-    }
-
-    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite");
-  }
-
-  @Override
-  public void abort(WriterCommitMessage[] messages) {
-    Tasks.foreach(files(messages))
-        .retry(propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
-        .exponentialBackoff(
-            propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
-            propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
-            propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
-            2.0 /* exponential */)
-        .throwFailureWhenFinished()
-        .run(file -> {
-          io.value().deleteFile(file.path().toString());
-        });
-  }
-
-  protected Table table() {
-    return table;
-  }
-
-  protected Iterable<DataFile> files(WriterCommitMessage[] messages) {
-    if (messages.length > 0) {
-      return Iterables.concat(Iterables.transform(Arrays.asList(messages), message -> message != null ?
-          ImmutableList.copyOf(((TaskCommit) message).files()) :
-          ImmutableList.of()));
-    }
-    return ImmutableList.of();
-  }
-
-  private int propertyAsInt(String property, int defaultValue) {
-    Map<String, String> properties = table.properties();
-    String value = properties.get(property);
-    if (value != null) {
-      return Integer.parseInt(properties.get(property));
-    }
-    return defaultValue;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("IcebergWrite(table=%s, format=%s)", table, format);
-  }
-
-
-  static class TaskCommit implements WriterCommitMessage {
-    private final DataFile[] files;
-
-    TaskCommit() {
-      this.files = new DataFile[0];
-    }
-
-    TaskCommit(DataFile file) {
-      this.files = new DataFile[] { file };
-    }
-
-    TaskCommit(List<DataFile> files) {
-      this.files = files.toArray(new DataFile[files.size()]);
-    }
-
-    DataFile[] files() {
-      return files;
-    }
-  }
-
-  static class WriterFactory implements DataWriterFactory<InternalRow> {
-    private final PartitionSpec spec;
-    private final FileFormat format;
-    private final LocationProvider locations;
-    private final Map<String, String> properties;
-    private final Broadcast<FileIO> io;
-    private final Broadcast<EncryptionManager> encryptionManager;
-    private final long targetFileSize;
-    private final Schema writeSchema;
-    private final StructType dsSchema;
-
-    WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
-                  Map<String, String> properties, Broadcast<FileIO> io,
-                  Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
-                  Schema writeSchema, StructType dsSchema) {
-      this.spec = spec;
-      this.format = format;
-      this.locations = locations;
-      this.properties = properties;
-      this.io = io;
-      this.encryptionManager = encryptionManager;
-      this.targetFileSize = targetFileSize;
-      this.writeSchema = writeSchema;
-      this.dsSchema = dsSchema;
-    }
-
-    @Override
-    public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, epochId);
-      AppenderFactory<InternalRow> appenderFactory = new SparkAppenderFactory();
-
-      if (spec.fields().isEmpty()) {
-        return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
-      } else {
-        return new PartitionedWriter(
-            spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema);
-      }
-    }
-
-    private class SparkAppenderFactory implements AppenderFactory<InternalRow> {
-      @Override
-      public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFormat) {
-        MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
-        try {
-          switch (fileFormat) {
-            case PARQUET:
-              return Parquet.write(file)
-                  .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType))
-                  .setAll(properties)
-                  .metricsConfig(metricsConfig)
-                  .schema(writeSchema)
-                  .overwrite()
-                  .build();
-
-            case AVRO:
-              return Avro.write(file)
-                  .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema))
-                  .setAll(properties)
-                  .schema(writeSchema)
-                  .overwrite()
-                  .build();
-
-            case ORC:
-              return ORC.write(file)
-                  .createWriterFunc(SparkOrcWriter::new)
-                  .setAll(properties)
-                  .schema(writeSchema)
-                  .overwrite()
-                  .build();
-
-            default:
-              throw new UnsupportedOperationException("Cannot write unknown format: " + fileFormat);
-          }
-        } catch (IOException e) {
-          throw new RuntimeIOException(e);
-        }
-      }
-    }
-
-    private class OutputFileFactory {
-      private final int partitionId;
-      private final long taskId;
-      // The purpose of this uuid is to be able to know from two paths that they were written by the same operation.
-      // That's useful, for example, if a Spark job dies and leaves files in the file system, you can identify them all
-      // with a recursive listing and grep.
-      private final String uuid = UUID.randomUUID().toString();
-      private int fileCount;
-
-      OutputFileFactory(int partitionId, long taskId, long epochId) {
-        this.partitionId = partitionId;
-        this.taskId = taskId;
-        this.fileCount = 0;
-      }
-
-      private String generateFilename() {
-        return format.addExtension(String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount++));
-      }
-
-      /**
-       * Generates EncryptedOutputFile for UnpartitionedWriter.
-       */
-      public EncryptedOutputFile newOutputFile() {
-        OutputFile file = io.value().newOutputFile(locations.newDataLocation(generateFilename()));
-        return encryptionManager.value().encrypt(file);
-      }
-
-      /**
-       * Generates EncryptedOutputFile for PartitionedWriter.
-       */
-      public EncryptedOutputFile newOutputFile(PartitionKey key) {
-        String newDataLocation = locations.newDataLocation(spec, key, generateFilename());
-        OutputFile rawOutputFile = io.value().newOutputFile(newDataLocation);
-        return encryptionManager.value().encrypt(rawOutputFile);
-      }
-    }
-  }
-
-  private interface AppenderFactory<T> {
-    FileAppender<T> newAppender(OutputFile file, FileFormat format);
-  }
-
-  private abstract static class BaseWriter implements DataWriter<InternalRow> {
-    protected static final int ROWS_DIVISOR = 1000;
-
-    private final List<DataFile> completedFiles = Lists.newArrayList();
-    private final PartitionSpec spec;
-    private final FileFormat format;
-    private final AppenderFactory<InternalRow> appenderFactory;
-    private final WriterFactory.OutputFileFactory fileFactory;
-    private final FileIO fileIo;
-    private final long targetFileSize;
-    private PartitionKey currentKey = null;
-    private FileAppender<InternalRow> currentAppender = null;
-    private EncryptedOutputFile currentFile = null;
-    private long currentRows = 0;
-
-    BaseWriter(PartitionSpec spec, FileFormat format, AppenderFactory<InternalRow> appenderFactory,
-               WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
-      this.spec = spec;
-      this.format = format;
-      this.appenderFactory = appenderFactory;
-      this.fileFactory = fileFactory;
-      this.fileIo = fileIo;
-      this.targetFileSize = targetFileSize;
-    }
-
-    @Override
-    public abstract void write(InternalRow row) throws IOException;
-
-    public void writeInternal(InternalRow row)  throws IOException {
-      //TODO: ORC file now not support target file size before closed
-      if  (!format.equals(FileFormat.ORC) &&
-          currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) {
-        closeCurrent();
-        openCurrent();
-      }
-
-      currentAppender.add(row);
-      currentRows++;
-    }
-
-    @Override
-    public WriterCommitMessage commit() throws IOException {
-      closeCurrent();
-
-      return new TaskCommit(completedFiles);
-    }
-
-    @Override
-    public void abort() throws IOException {
-      closeCurrent();
-
-      // clean up files created by this writer
-      Tasks.foreach(completedFiles)
-          .throwFailureWhenFinished()
-          .noRetry()
-          .run(file -> fileIo.deleteFile(file.path().toString()));
-    }
-
-    protected void openCurrent() {
-      if (spec.fields().size() == 0) {
-        // unpartitioned
-        currentFile = fileFactory.newOutputFile();
-      } else {
-        // partitioned
-        currentFile = fileFactory.newOutputFile(currentKey);
-      }
-      currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
-      currentRows = 0;
-    }
-
-    protected void closeCurrent() throws IOException {
-      if (currentAppender != null) {
-        currentAppender.close();
-        // metrics are only valid after the appender is closed
-        Metrics metrics = currentAppender.metrics();
-        long fileSizeInBytes = currentAppender.length();
-        List<Long> splitOffsets = currentAppender.splitOffsets();
-        this.currentAppender = null;
-
-        if (metrics.recordCount() == 0L) {
-          fileIo.deleteFile(currentFile.encryptingOutputFile());
-        } else {
-          DataFile dataFile = DataFiles.builder(spec)
-              .withEncryptionKeyMetadata(currentFile.keyMetadata())
-              .withPath(currentFile.encryptingOutputFile().location())
-              .withFileSizeInBytes(fileSizeInBytes)
-              .withPartition(spec.fields().size() == 0 ? null : currentKey) // set null if unpartitioned
-              .withMetrics(metrics)
-              .withSplitOffsets(splitOffsets)
-              .build();
-          completedFiles.add(dataFile);
-        }
-
-        this.currentFile = null;
-      }
-    }
-
-    protected PartitionKey getCurrentKey() {
-      return currentKey;
-    }
-
-    protected void setCurrentKey(PartitionKey currentKey) {
-      this.currentKey = currentKey;
-    }
-  }
-
-  private static class UnpartitionedWriter extends BaseWriter {
-    UnpartitionedWriter(
-        PartitionSpec spec,
-        FileFormat format,
-        AppenderFactory<InternalRow> appenderFactory,
-        WriterFactory.OutputFileFactory fileFactory,
-        FileIO fileIo,
-        long targetFileSize) {
-      super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
-
-      openCurrent();
-    }
-
-    @Override
-    public void write(InternalRow row) throws IOException {
-      writeInternal(row);
-    }
-  }
-
-  private static class PartitionedWriter extends BaseWriter {
-    private final PartitionKey key;
-    private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
-
-    PartitionedWriter(
-        PartitionSpec spec,
-        FileFormat format,
-        AppenderFactory<InternalRow> appenderFactory,
-        WriterFactory.OutputFileFactory fileFactory,
-        FileIO fileIo,
-        long targetFileSize,
-        Schema writeSchema) {
-      super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
-      this.key = new PartitionKey(spec, writeSchema);
-    }
-
-    @Override
-    public void write(InternalRow row) throws IOException {
-      key.partition(row);
-
-      PartitionKey currentKey = getCurrentKey();
-      if (!key.equals(currentKey)) {
-        closeCurrent();
-        completedPartitions.add(currentKey);
-
-        if (completedPartitions.contains(key)) {
-          // if rows are not correctly grouped, detect and fail the write
-          PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null);
-          LOG.warn("Duplicate key: {} == {}", existingKey, key);
-          throw new IllegalStateException("Already closed files for partition: " + key.toPath());
-        }
-
-        setCurrentKey(key.copy());
-        openCurrent();
-      }
-
-      writeInternal(row);
-    }
-  }
-}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java b/spark2/src/main/java/org/apache/iceberg/spark/SparkFilters.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
rename to spark2/src/main/java/org/apache/iceberg/spark/SparkFilters.java
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
rename to spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
similarity index 93%
rename from spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
rename to spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
index a9faf00..51859de 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -32,7 +32,6 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
@@ -44,7 +43,6 @@ import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -64,8 +62,6 @@ import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
 import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
 import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.slf4j.Logger;
@@ -396,8 +392,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
 
     private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
-    private transient NameMapping nameMapping = null;
-    private transient String[] preferredLocations;
+    private transient String[] preferredLocations = null;
 
     private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
                      String nameMappingString, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
@@ -465,7 +460,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
     public InputPartitionReader<InternalRow> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
                                                     String nameMapping, FileIO io,
                                                     EncryptionManager encryptionManager, boolean caseSensitive) {
-      return new RowDataReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
+      return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
     }
   }
 
@@ -480,41 +475,21 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
     public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
                                                       String nameMapping, FileIO io,
                                                       EncryptionManager encryptionManager, boolean caseSensitive) {
-      return new BatchDataReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize);
+      return new BatchReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize);
     }
   }
 
-  private static class StructLikeInternalRow implements StructLike {
-    private final DataType[] types;
-    private InternalRow row = null;
-
-    StructLikeInternalRow(StructType struct) {
-      this.types = new DataType[struct.size()];
-      StructField[] fields = struct.fields();
-      for (int i = 0; i < fields.length; i += 1) {
-        types[i] = fields[i].dataType();
-      }
-    }
-
-    public StructLikeInternalRow setRow(InternalRow row) {
-      this.row = row;
-      return this;
-    }
-
-    @Override
-    public int size() {
-      return types.length;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <T> T get(int pos, Class<T> javaClass) {
-      return javaClass.cast(row.get(pos, types[pos]));
+  private static class RowReader extends RowDataReader implements InputPartitionReader<InternalRow> {
+    RowReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
+              EncryptionManager encryptionManager, boolean caseSensitive) {
+      super(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
     }
+  }
 
-    @Override
-    public <T> void set(int pos, T value) {
-      throw new UnsupportedOperationException("Not implemented: set");
+  private static class BatchReader extends BatchDataReader implements InputPartitionReader<ColumnarBatch> {
+    BatchReader(CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO io,
+                       EncryptionManager encryptionManager, boolean caseSensitive, int size) {
+      super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size);
     }
   }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Stats.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
rename to spark2/src/main/java/org/apache/iceberg/spark/source/Stats.java
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
rename to spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
new file mode 100644
index 0000000..a24820b
--- /dev/null
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+// TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
+class Writer implements DataSourceWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
+
+  private final Table table;
+  private final FileFormat format;
+  private final Broadcast<FileIO> io;
+  private final Broadcast<EncryptionManager> encryptionManager;
+  private final boolean replacePartitions;
+  private final String applicationId;
+  private final String wapId;
+  private final long targetFileSize;
+  private final Schema writeSchema;
+  private final StructType dsSchema;
+
+  Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
+         DataSourceOptions options, boolean replacePartitions, String applicationId, Schema writeSchema,
+         StructType dsSchema) {
+    this(table, io, encryptionManager, options, replacePartitions, applicationId, null, writeSchema, dsSchema);
+  }
+
+  Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
+         DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId,
+         Schema writeSchema, StructType dsSchema) {
+    this.table = table;
+    this.format = getFileFormat(table.properties(), options);
+    this.io = io;
+    this.encryptionManager = encryptionManager;
+    this.replacePartitions = replacePartitions;
+    this.applicationId = applicationId;
+    this.wapId = wapId;
+    this.writeSchema = writeSchema;
+    this.dsSchema = dsSchema;
+
+    long tableTargetFileSize = PropertyUtil.propertyAsLong(
+        table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize);
+  }
+
+  private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
+    Optional<String> formatOption = options.get("write-format");
+    String formatString = formatOption
+        .orElse(tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
+    return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+  }
+
+  private boolean isWapTable() {
+    return Boolean.parseBoolean(table.properties().getOrDefault(
+        TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
+  }
+
+  @Override
+  public DataWriterFactory<InternalRow> createWriterFactory() {
+    return new WriterFactory(
+        table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
+        writeSchema, dsSchema);
+  }
+
+  @Override
+  public void commit(WriterCommitMessage[] messages) {
+    if (replacePartitions) {
+      replacePartitions(messages);
+    } else {
+      append(messages);
+    }
+  }
+
+  protected void commitOperation(SnapshotUpdate<?> operation, int numFiles, String description) {
+    LOG.info("Committing {} with {} files to table {}", description, numFiles, table);
+    if (applicationId != null) {
+      operation.set("spark.app.id", applicationId);
+    }
+
+    if (isWapTable() && wapId != null) {
+      // write-audit-publish is enabled for this table and job
+      // stage the changes without changing the current snapshot
+      operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId);
+      operation.stageOnly();
+    }
+
+    long start = System.currentTimeMillis();
+    operation.commit(); // abort is automatically called if this fails
+    long duration = System.currentTimeMillis() - start;
+    LOG.info("Committed in {} ms", duration);
+  }
+
+  private void append(WriterCommitMessage[] messages) {
+    AppendFiles append = table.newAppend();
+
+    int numFiles = 0;
+    for (DataFile file : files(messages)) {
+      numFiles += 1;
+      append.appendFile(file);
+    }
+
+    commitOperation(append, numFiles, "append");
+  }
+
+  private void replacePartitions(WriterCommitMessage[] messages) {
+    ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
+
+    int numFiles = 0;
+    for (DataFile file : files(messages)) {
+      numFiles += 1;
+      dynamicOverwrite.addFile(file);
+    }
+
+    commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite");
+  }
+
+  @Override
+  public void abort(WriterCommitMessage[] messages) {
+    Map<String, String> props = table.properties();
+    Tasks.foreach(files(messages))
+        .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+        .exponentialBackoff(
+            PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+            PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+            PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+            2.0 /* exponential */)
+        .throwFailureWhenFinished()
+        .run(file -> {
+          io.value().deleteFile(file.path().toString());
+        });
+  }
+
+  protected Table table() {
+    return table;
+  }
+
+  protected Iterable<DataFile> files(WriterCommitMessage[] messages) {
+    if (messages.length > 0) {
+      return Iterables.concat(Iterables.transform(Arrays.asList(messages), message -> message != null ?
+          ImmutableList.copyOf(((TaskResult) message).files()) :
+          ImmutableList.of()));
+    }
+    return ImmutableList.of();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("IcebergWrite(table=%s, format=%s)", table, format);
+  }
+
+  private static class TaskCommit extends TaskResult implements WriterCommitMessage {
+    TaskCommit(TaskResult toCopy) {
+      super(toCopy.files());
+    }
+  }
+
+  static class WriterFactory implements DataWriterFactory<InternalRow> {
+    private final PartitionSpec spec;
+    private final FileFormat format;
+    private final LocationProvider locations;
+    private final Map<String, String> properties;
+    private final Broadcast<FileIO> io;
+    private final Broadcast<EncryptionManager> encryptionManager;
+    private final long targetFileSize;
+    private final Schema writeSchema;
+    private final StructType dsSchema;
+
+    WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
+                  Map<String, String> properties, Broadcast<FileIO> io,
+                  Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
+                  Schema writeSchema, StructType dsSchema) {
+      this.spec = spec;
+      this.format = format;
+      this.locations = locations;
+      this.properties = properties;
+      this.io = io;
+      this.encryptionManager = encryptionManager;
+      this.targetFileSize = targetFileSize;
+      this.writeSchema = writeSchema;
+      this.dsSchema = dsSchema;
+    }
+
+    @Override
+    public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+      OutputFileFactory fileFactory = new OutputFileFactory(
+          spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
+      SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema);
+
+      if (spec.fields().isEmpty()) {
+        return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
+      } else {
+        return new Partitioned24Writer(
+            spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema);
+      }
+    }
+  }
+
+  private static class Unpartitioned24Writer extends UnpartitionedWriter implements DataWriter<InternalRow> {
+    Unpartitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
+                          OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
+      super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+      return new TaskCommit(complete());
+    }
+  }
+
+  private static class Partitioned24Writer extends PartitionedWriter implements DataWriter<InternalRow> {
+    Partitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
+                               OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, Schema writeSchema) {
+      super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, writeSchema);
+    }
+
+    @Override
+    public WriterCommitMessage commit() throws IOException {
+      return new TaskCommit(complete());
+    }
+  }
+}
diff --git a/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
similarity index 100%
rename from spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
rename to spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark2/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
similarity index 100%
rename from spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
rename to spark2/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java b/spark2/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
rename to spark2/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java b/spark2/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
rename to spark2/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteManifestsAction.java b/spark2/src/test/java/org/apache/iceberg/actions/TestRewriteManifestsAction.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/actions/TestRewriteManifestsAction.java
rename to spark2/src/test/java/org/apache/iceberg/actions/TestRewriteManifestsAction.java
diff --git a/spark/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java b/spark2/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
rename to spark2/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
diff --git a/spark/src/test/java/org/apache/iceberg/examples/README.md b/spark2/src/test/java/org/apache/iceberg/examples/README.md
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/README.md
rename to spark2/src/test/java/org/apache/iceberg/examples/README.md
diff --git a/spark/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java b/spark2/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
rename to spark2/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
diff --git a/spark/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java b/spark2/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
rename to spark2/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
diff --git a/spark/src/test/java/org/apache/iceberg/examples/SimpleRecord.java b/spark2/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
rename to spark2/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
diff --git a/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java b/spark2/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
rename to spark2/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java b/spark2/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
rename to spark2/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/LogMessage.java b/spark2/src/test/java/org/apache/iceberg/spark/source/LogMessage.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/LogMessage.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/LogMessage.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java b/spark2/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestReadProjection.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestReadProjection.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestReadProjection.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestReadProjection.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkSchema.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkSchema.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkSchema.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkSchema.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTables.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestTables.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
diff --git a/spark/src/test/resources/data/books.json b/spark2/src/test/resources/data/books.json
similarity index 100%
rename from spark/src/test/resources/data/books.json
rename to spark2/src/test/resources/data/books.json
diff --git a/spark/src/test/resources/data/new-books.json b/spark2/src/test/resources/data/new-books.json
similarity index 100%
rename from spark/src/test/resources/data/new-books.json
rename to spark2/src/test/resources/data/new-books.json