You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/11 01:19:47 UTC

[incubator-iceberg] 02/03: Code review updates.

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch spark-3
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git

commit 176393ca8d9d3461d1e697e54e87b8b89c12ffad
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Mon Dec 9 16:54:25 2019 -0800

    Code review updates.
---
 api/src/main/java/org/apache/iceberg/Table.java    |   9 -
 build.gradle                                       |   6 -
 .../org/apache/iceberg/spark/SparkFilters.java     |  19 +
 .../java/org/apache/iceberg/spark/SparkUtil.java   | 242 +++++++++++
 .../java/org/apache/iceberg/spark/SparkUtils.java  |  92 ----
 ...cebergTableProvider.java => IcebergSource.java} |  62 ++-
 .../apache/iceberg/spark/source/IcebergTable.java  | 222 ----------
 .../{IcebergBatchScan.java => SparkBatchScan.java} | 482 +++++++++------------
 ...cebergBatchWriter.java => SparkBatchWrite.java} |  77 ++--
 .../iceberg/spark/source/SparkScanBuilder.java     | 102 +++++
 ...reamingWriter.java => SparkStreamingWrite.java} |  54 +--
 .../apache/iceberg/spark/source/SparkTable.java    | 118 +++++
 .../iceberg/spark/source/SparkWriteBuilder.java    | 173 ++++++++
 ...org.apache.spark.sql.sources.DataSourceRegister |   2 +-
 .../iceberg/spark/source/TestDataFrameWrites.java  |  12 +-
 .../iceberg/spark/source/TestFilteredScan.java     |  64 +--
 .../iceberg/spark/source/TestIcebergSource.java    |   5 +-
 .../iceberg/spark/source/TestParquetWrite.java     |  21 +-
 .../spark/source/TestStructuredStreaming.java      |   4 +-
 19 files changed, 991 insertions(+), 775 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java
index e0fa525..63c55e5 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -228,13 +228,4 @@ public interface Table {
    * @return a {@link LocationProvider} to provide locations for new data files
    */
   LocationProvider locationProvider();
-
-  /**
-   * Return the name of this table.
-   *
-   * @return this table's name
-   */
-  default String name() {
-    return "table(" + hashCode() + ")";
-  }
 }
diff --git a/build.gradle b/build.gradle
index 6835cbe..cc0f666 100644
--- a/build.gradle
+++ b/build.gradle
@@ -240,15 +240,9 @@ project(':iceberg-spark') {
     compile project(':iceberg-hive')
 
     compileOnly "org.apache.avro:avro"
-    compileOnly("org.apache.spark:spark-sql_2.12") {
-      exclude group: 'org.apache.avro', module: 'avro'
-    }
     compileOnly("org.apache.spark:spark-hive_2.12") {
       exclude group: 'org.apache.avro', module: 'avro'
     }
-    compileOnly("org.apache.spark:spark-catalyst_2.12") {
-      exclude group: 'org.apache.avro', module: 'avro'
-    }
 
     testCompile "org.apache.hadoop:hadoop-hdfs::tests"
     testCompile "org.apache.hadoop:hadoop-common::tests"
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
index c6f8f52..6c43b87 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
@@ -25,7 +25,10 @@ import java.sql.Date;
 import java.sql.Timestamp;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.sources.AlwaysFalse$;
+import org.apache.spark.sql.sources.AlwaysTrue$;
 import org.apache.spark.sql.sources.And;
 import org.apache.spark.sql.sources.EqualNullSafe;
 import org.apache.spark.sql.sources.EqualTo;
@@ -60,6 +63,8 @@ public class SparkFilters {
 
   private static final ImmutableMap<Class<? extends Filter>, Operation> FILTERS = ImmutableMap
       .<Class<? extends Filter>, Operation>builder()
+      .put(AlwaysTrue$.class, Operation.TRUE)
+      .put(AlwaysFalse$.class, Operation.FALSE)
       .put(EqualTo.class, Operation.EQ)
       .put(EqualNullSafe.class, Operation.EQ)
       .put(GreaterThan.class, Operation.GT)
@@ -75,11 +80,25 @@ public class SparkFilters {
       .put(StringStartsWith.class, Operation.STARTS_WITH)
       .build();
 
+  public static Expression convert(Filter[] filters) {
+    Expression expression = Expressions.alwaysTrue();
+    for (Filter filter : filters) {
+      expression = Expressions.and(expression, convert(filter));
+    }
+    return expression;
+  }
+
   public static Expression convert(Filter filter) {
     // avoid using a chain of if instanceof statements by mapping to the expression enum.
     Operation op = FILTERS.get(filter.getClass());
     if (op != null) {
       switch (op) {
+        case TRUE:
+          return Expressions.alwaysTrue();
+
+        case FALSE:
+          return Expressions.alwaysFalse();
+
         case IS_NULL:
           IsNull isNullFilter = (IsNull) filter;
           return isNull(isNullFilter.attribute());
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
new file mode 100644
index 0000000..62b83f2
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.transforms.PartitionSpecVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Expression;
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Literal;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+
+public class SparkUtil {
+
+  private SparkUtil() {
+  }
+
+  private static final Joiner DOT = Joiner.on(".");
+
+  /**
+   * Applies a list of Spark table changes to an {@link UpdateProperties} operation.
+   * <p>
+   * All non-property changes in the list are ignored.
+   *
+   * @param pendingUpdate an uncommitted UpdateProperties operation to configure
+   * @param changes a list of Spark table changes
+   * @return the UpdateProperties operation configured with the changes
+   */
+  public static UpdateProperties applyPropertyChanges(UpdateProperties pendingUpdate, List<TableChange> changes) {
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.SetProperty) {
+        TableChange.SetProperty set = (TableChange.SetProperty) change;
+        pendingUpdate.set(set.property(), set.value());
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        TableChange.RemoveProperty remove = (TableChange.RemoveProperty) change;
+        pendingUpdate.remove(remove.property());
+      }
+    }
+
+    return pendingUpdate;
+  }
+
+  /**
+   * Applies a list of Spark table changes to an {@link UpdateSchema} operation.
+   * <p>
+   * All non-schema changes in the list are ignored.
+   *
+   * @param pendingUpdate an uncommitted UpdateSchema operation to configure
+   * @param changes a list of Spark table changes
+   * @return the UpdateSchema operation configured with the changes
+   */
+  public static UpdateSchema applySchemaChanges(UpdateSchema pendingUpdate, List<TableChange> changes) {
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn add = (TableChange.AddColumn) change;
+        Type type = SparkSchemaUtil.convert(add.dataType());
+        pendingUpdate.addColumn(parentName(add.fieldNames()), leafName(add.fieldNames()), type, add.comment());
+
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change;
+        Type newType = SparkSchemaUtil.convert(update.newDataType());
+        Preconditions.checkArgument(newType.isPrimitiveType(),
+            "Cannot update '%s', not a primitive type: %s", DOT.join(update.fieldNames()), update.newDataType());
+        pendingUpdate.updateColumn(DOT.join(update.fieldNames()), newType.asPrimitiveType());
+
+      } else if (change instanceof TableChange.UpdateColumnComment) {
+        TableChange.UpdateColumnComment update = (TableChange.UpdateColumnComment) change;
+        pendingUpdate.updateColumnDoc(DOT.join(update.fieldNames()), update.newComment());
+
+      } else if (change instanceof TableChange.RenameColumn) {
+        TableChange.RenameColumn rename = (TableChange.RenameColumn) change;
+        pendingUpdate.renameColumn(DOT.join(rename.fieldNames()), rename.newName());
+
+      } else if (change instanceof TableChange.DeleteColumn) {
+        TableChange.DeleteColumn delete = (TableChange.DeleteColumn) change;
+        pendingUpdate.deleteColumn(DOT.join(delete.fieldNames()));
+      }
+    }
+
+    return pendingUpdate;
+  }
+
+  /**
+   * Converts a PartitionSpec to Spark transforms.
+   *
+   * @param spec a PartitionSpec
+   * @return an array of Transforms
+   */
+  public static Transform[] toTransforms(PartitionSpec spec) {
+    List<Transform> transforms = PartitionSpecVisitor.visit(spec.schema(), spec,
+        new PartitionSpecVisitor<Transform>() {
+          @Override
+          public Transform identity(String sourceName, int sourceId) {
+            return Expressions.identity(sourceName);
+          }
+
+          @Override
+          public Transform bucket(String sourceName, int sourceId, int width) {
+            return Expressions.bucket(width, sourceName);
+          }
+
+          @Override
+          public Transform truncate(String sourceName, int sourceId, int width) {
+            return Expressions.apply("truncate", Expressions.column(sourceName), Expressions.literal(width));
+          }
+
+          @Override
+          public Transform year(String sourceName, int sourceId) {
+            return Expressions.years(sourceName);
+          }
+
+          @Override
+          public Transform month(String sourceName, int sourceId) {
+            return Expressions.months(sourceName);
+          }
+
+          @Override
+          public Transform day(String sourceName, int sourceId) {
+            return Expressions.days(sourceName);
+          }
+
+          @Override
+          public Transform hour(String sourceName, int sourceId) {
+            return Expressions.hours(sourceName);
+          }
+        });
+
+    return transforms.toArray(new Transform[0]);
+  }
+
+  /**
+   * Converts Spark transforms into a {@link PartitionSpec}.
+   *
+   * @param schema the table schema
+   * @param partitioning Spark Transforms
+   * @return a PartitionSpec
+   */
+  public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partitioning) {
+    if (partitioning == null || partitioning.length == 0) {
+      return PartitionSpec.unpartitioned();
+    }
+
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+    for (Transform transform : partitioning) {
+      Preconditions.checkArgument(transform.references().length == 1,
+          "Cannot convert transform with more than one column reference: %s", transform);
+      String colName = DOT.join(transform.references()[0].fieldNames());
+      switch (transform.name()) {
+        case "identity":
+          builder.identity(colName);
+          break;
+        case "bucket":
+          builder.bucket(colName, findWidth(transform));
+          break;
+        case "years":
+          builder.year(colName);
+          break;
+        case "months":
+          builder.month(colName);
+          break;
+        case "days":
+          builder.day(colName);
+          break;
+        case "hours":
+          builder.hour(colName);
+          break;
+        case "truncate":
+          builder.truncate(colName, findWidth(transform));
+          break;
+        default:
+          throw new UnsupportedOperationException("Transform is not supported: " + transform);
+      }
+    }
+
+    return builder.build();
+  }
+
+  @SuppressWarnings("unchecked")
+  private static int findWidth(Transform transform) {
+    for (Expression expr : transform.arguments()) {
+      if (expr instanceof Literal) {
+        if (((Literal) expr).dataType() instanceof IntegerType) {
+          Literal<Integer> lit = (Literal<Integer>) expr;
+          Preconditions.checkArgument(lit.value() > 0,
+              "Unsupported width for transform: %s", transform.describe());
+          return lit.value();
+
+        } else if (((Literal) expr).dataType() instanceof LongType) {
+          Literal<Long> lit = (Literal<Long>) expr;
+          Preconditions.checkArgument(lit.value() > 0 && lit.value() < Integer.MAX_VALUE,
+              "Unsupported width for transform: %s", transform.describe());
+          if (lit.value() > Integer.MAX_VALUE) {
+            throw new IllegalArgumentException();
+          }
+          return lit.value().intValue();
+        }
+      }
+    }
+
+    throw new IllegalArgumentException("Cannot find width for transform: " + transform.describe());
+  }
+
+  private static String leafName(String[] fieldNames) {
+    Preconditions.checkArgument(fieldNames.length > 0, "Invalid field name: at least one name is required");
+    return fieldNames[fieldNames.length - 1];
+  }
+
+  private static String parentName(String[] fieldNames) {
+    if (fieldNames.length > 1) {
+      return DOT.join(Arrays.copyOfRange(fieldNames, 0, fieldNames.length - 1));
+    }
+    return null;
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkUtils.java b/spark/src/main/java/org/apache/iceberg/spark/SparkUtils.java
deleted file mode 100644
index 64e81cb..0000000
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkUtils.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark;
-
-
-import com.google.common.collect.Lists;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.expressions.Expressions;
-import org.apache.spark.sql.connector.expressions.Transform;
-
-public final class SparkUtils {
-
-  private static final Pattern HAS_WIDTH = Pattern.compile("(\\w+)\\[(\\d+)\\]");
-
-  private SparkUtils() {}
-
-  public static SparkSession getSparkSession() {
-    return SparkSession.builder().getOrCreate();
-  }
-
-  public static Configuration getBaseConf() {
-    return getSparkSession().sparkContext().hadoopConfiguration();
-  }
-
-
-  public static Transform[] toTransforms(PartitionSpec spec) {
-    List<Transform> transforms = Lists.newArrayList();
-    int numBuckets = 0;
-    List<String> bucketColumns = Lists.newArrayList();
-
-    for (PartitionField f : spec.fields()) {
-      Matcher widthMatcher = HAS_WIDTH.matcher(f.transform().toString());
-      if (widthMatcher.matches()) {
-        String name = widthMatcher.group(1);
-        if (name.equalsIgnoreCase("truncate")) {
-          throw new UnsupportedOperationException("Spark doesn't support truncate transform");
-
-        } else if (name.equalsIgnoreCase("bucket")) {
-          numBuckets = Integer.parseInt(widthMatcher.group(2));
-          bucketColumns.add(spec.schema().findColumnName(f.sourceId()));
-
-        } else if (f.transform().toString().equalsIgnoreCase("identity")) {
-          transforms.add(Expressions.identity(spec.schema().findColumnName(f.sourceId())));
-
-        } else if (f.transform().toString().equalsIgnoreCase("year")) {
-          transforms.add(Expressions.years(spec.schema().findColumnName(f.sourceId())));
-
-        } else if (f.transform().toString().equalsIgnoreCase("month")) {
-          transforms.add(Expressions.months(spec.schema().findColumnName(f.sourceId())));
-
-        } else if (f.transform().toString().equalsIgnoreCase("day")) {
-          transforms.add(Expressions.days(spec.schema().findColumnName(f.sourceId())));
-
-        } else if (f.transform().toString().equalsIgnoreCase("hour")) {
-          transforms.add(Expressions.hours(spec.schema().findColumnName(f.sourceId())));
-
-        } else {
-          throw new UnsupportedOperationException("Spark doesn't support transform " + f.transform());
-        }
-      }
-    }
-
-    if (!bucketColumns.isEmpty()) {
-      transforms.add(Expressions.bucket(numBuckets, bucketColumns.toArray(new String[0])));
-    }
-
-    return transforms.toArray(new Transform[0]);
-  }
-}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergTableProvider.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
similarity index 60%
rename from spark/src/main/java/org/apache/iceberg/spark/source/IcebergTableProvider.java
rename to spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 9f7e1d9..0b1cb7a 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergTableProvider.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -21,47 +21,59 @@ package org.apache.iceberg.spark.source;
 
 import com.google.common.base.Preconditions;
 import java.util.Map;
-import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.HiveCatalogs;
-import org.apache.iceberg.spark.SparkUtils;
-import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.connector.catalog.TableProvider;
 import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
-public class IcebergTableProvider implements DataSourceRegister, TableProvider {
+public class IcebergSource implements DataSourceRegister, TableProvider {
   @Override
   public String shortName() {
     return "iceberg";
   }
 
   @Override
-  public Table getTable(CaseInsensitiveStringMap options) {
+  public SparkTable getTable(CaseInsensitiveStringMap options) {
     return getTable(options, null);
   }
 
   @Override
-  public Table getTable(CaseInsensitiveStringMap options, StructType readSchema) {
+  public SparkTable getTable(CaseInsensitiveStringMap options, StructType readSchema) {
     // Get Iceberg table from options
-    Configuration conf = new Configuration(SparkUtils.getBaseConf());
-    org.apache.iceberg.Table tableInIceberg = getTableAndResolveHadoopConfiguration(options, conf);
+    Configuration conf = new Configuration(SparkSession.active().sparkContext().hadoopConfiguration());
+    Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf);
 
     // Build Spark table based on Iceberg table, and return it
-    return new IcebergTable(tableInIceberg, readSchema);
+    return new SparkTable(icebergTable, readSchema);
   }
 
-  protected org.apache.iceberg.Table getTableAndResolveHadoopConfiguration(
+  protected Table findTable(CaseInsensitiveStringMap options, Configuration conf) {
+    Preconditions.checkArgument(options.containsKey("path"), "Cannot open table: path is not set");
+    String path = options.get("path");
+
+    if (path.contains("/")) {
+      HadoopTables tables = new HadoopTables(conf);
+      return tables.load(path);
+    } else {
+      HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
+      TableIdentifier tableIdentifier = TableIdentifier.parse(path);
+      return hiveCatalog.loadTable(tableIdentifier);
+    }
+  }
+
+  private Table getTableAndResolveHadoopConfiguration(
       CaseInsensitiveStringMap options, Configuration conf) {
     // Overwrite configurations from the Spark Context with configurations from the options.
     mergeIcebergHadoopConfs(conf, options);
 
-    // Find table (in Iceberg) based on the given path
-    org.apache.iceberg.Table table = findTable(options, conf);
+    Table table = findTable(options, conf);
 
     // Set confs from table properties
     mergeIcebergHadoopConfs(conf, table.properties());
@@ -72,30 +84,10 @@ public class IcebergTableProvider implements DataSourceRegister, TableProvider {
     return table;
   }
 
-  /**
-   * Merge delta options into base conf
-   *
-   * @param baseConf the base conf
-   * @param options  the delta options to merge into base
-   */
-  private void mergeIcebergHadoopConfs(Configuration baseConf, Map<String, String> options) {
+  private static void mergeIcebergHadoopConfs(
+      Configuration baseConf, Map<String, String> options) {
     options.keySet().stream()
-        .filter(key -> key.startsWith("hadoop."))  /* filter all keys staring with "hadoop." */
+        .filter(key -> key.startsWith("hadoop."))
         .forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
-    /* Modify the key by removing the prefix of "hadoop." and merge into base */
-  }
-
-  protected org.apache.iceberg.Table findTable(CaseInsensitiveStringMap options, Configuration conf) {
-    Optional<String> path = Optional.ofNullable(options.get("path"));
-    Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is not set");
-
-    if (path.get().contains("/")) {  // hadoop table
-      HadoopTables tables = new HadoopTables(conf);
-      return tables.load(path.get());
-    } else {  // hive table
-      HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
-      TableIdentifier tableIdentifier = TableIdentifier.parse(path.get());
-      return hiveCatalog.loadTable(tableIdentifier);
-    }
   }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergTable.java
deleted file mode 100644
index d02645d..0000000
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergTable.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.source;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.SparkUtils;
-import org.apache.iceberg.transforms.UnknownTransform;
-import org.apache.iceberg.types.CheckCompatibility;
-import org.apache.spark.sql.connector.catalog.SupportsRead;
-import org.apache.spark.sql.connector.catalog.SupportsWrite;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableCapability;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.connector.write.BatchWrite;
-import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite;
-import org.apache.spark.sql.connector.write.SupportsTruncate;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-final class IcebergTable implements Table, SupportsRead, SupportsWrite {
-
-  private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
-      TableCapability.BATCH_READ,
-      TableCapability.BATCH_WRITE,
-      TableCapability.MICRO_BATCH_READ,
-      TableCapability.STREAMING_WRITE,
-      TableCapability.TRUNCATE,
-      TableCapability.OVERWRITE_DYNAMIC);
-
-  private final org.apache.iceberg.Table tableInIceberg;
-  private StructType requestSchema;
-
-  IcebergTable(org.apache.iceberg.Table tableInIceberg, StructType requestSchema) {
-    this.tableInIceberg = tableInIceberg;
-
-    if (requestSchema != null) {
-      SparkSchemaUtil.convert(tableInIceberg.schema(), requestSchema);
-      this.requestSchema = requestSchema;
-    }
-  }
-
-  @Override
-  public String name() {
-    return tableInIceberg.name();
-  }
-
-  @Override
-  public StructType schema() {
-    if (requestSchema != null) {
-      return requestSchema;
-    }
-    return SparkSchemaUtil.convert(tableInIceberg.schema());
-  }
-
-  @Override
-  public Transform[] partitioning() {
-    return SparkUtils.toTransforms(tableInIceberg.spec());
-  }
-
-  @Override
-  public Map<String, String> properties() {
-    return tableInIceberg.properties();
-  }
-
-  @Override
-  public Set<TableCapability> capabilities() {
-    return CAPABILITIES;
-  }
-
-  @Override
-  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
-    return () -> new IcebergBatchScan(tableInIceberg, options, requestSchema);
-  }
-
-  @Override
-  public WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options) {
-    return new IcebergWriteBuilder(tableInIceberg, options);
-  }
-
-  static class IcebergWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, SupportsTruncate {
-
-    private org.apache.iceberg.Table table;
-    private CaseInsensitiveStringMap writeOptions;
-    private TableCapability writeBehavior = TableCapability.BATCH_WRITE;
-    private String writeQueryId = null;
-    private StructType dsStruct = null;
-
-    IcebergWriteBuilder(org.apache.iceberg.Table table, CaseInsensitiveStringMap options) {
-      this.table = table;
-      this.writeOptions = options;
-    }
-
-    @Override
-    public WriteBuilder withQueryId(String queryId) {
-      this.writeQueryId = queryId;
-      return this;
-    }
-
-    @Override
-    public WriteBuilder withInputDataSchema(StructType schemaInput) {
-      this.dsStruct = schemaInput;
-      return this;
-    }
-
-    @Override
-    public WriteBuilder overwriteDynamicPartitions() {
-      this.writeBehavior = TableCapability.OVERWRITE_DYNAMIC;
-      return this;
-    }
-
-    @Override
-    public WriteBuilder truncate() {
-      this.writeBehavior = TableCapability.TRUNCATE;
-      return this;
-    }
-
-    @Override
-    public BatchWrite buildForBatch() {
-      // TODO. Check queryId and schema before build?
-
-      // Validate
-      Schema dsSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
-      validateWriteSchema(table.schema(), dsSchema, checkNullability(writeOptions));
-      validatePartitionTransforms(table.spec());
-
-      // Get application id
-      String appId = SparkUtils.getSparkSession().sparkContext().applicationId();
-
-      // Get write-audit-publish id
-      String wapId = SparkUtils.getSparkSession().conf().get("spark.wap.id", null);
-
-      return new IcebergBatchWriter(table, writeOptions, writeBehavior, appId, wapId, dsSchema);
-    }
-
-    @Override
-    public StreamingWrite buildForStreaming() {
-      // TODO. Check queryId and schema before build?
-
-      // Validate
-      Schema dsSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
-      validateWriteSchema(table.schema(), dsSchema, checkNullability(writeOptions));
-      validatePartitionTransforms(table.spec());
-
-      // Change to streaming write if it is just append
-      if (writeBehavior.equals(TableCapability.BATCH_WRITE)) {
-        writeBehavior = TableCapability.STREAMING_WRITE;
-      }
-
-      // Get application id
-      String appId = SparkUtils.getSparkSession().sparkContext().applicationId();
-      String wapId = SparkUtils.getSparkSession().conf().get("spark.wap.id", null);
-      return new IcebergStreamingWriter(table, writeOptions, writeQueryId, writeBehavior, appId, wapId, table.schema());
-    }
-
-    private void validateWriteSchema(Schema tableSchema, Schema dsSchema, Boolean checkNullability) {
-      List<String> errors;
-      if (checkNullability) {
-        errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
-      } else {
-        errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema);
-      }
-      if (!errors.isEmpty()) {
-        StringBuilder sb = new StringBuilder();
-        sb.append("Cannot write incompatible dataset to table with schema:\n")
-          .append(tableSchema)
-          .append("\nProblems:");
-        for (String error : errors) {
-          sb.append("\n* ").append(error);
-        }
-        throw new IllegalArgumentException(sb.toString());
-      }
-    }
-
-    private void validatePartitionTransforms(PartitionSpec spec) {
-      if (spec.fields().stream().anyMatch(field -> field.transform() instanceof UnknownTransform)) {
-        String unsupported = spec.fields().stream()
-            .map(PartitionField::transform)
-            .filter(transform -> transform instanceof UnknownTransform)
-            .map(org.apache.iceberg.transforms.Transform::toString)
-            .collect(Collectors.joining(", "));
-
-        throw new UnsupportedOperationException(
-          String.format("Cannot write using unsupported transforms: %s", unsupported));
-      }
-    }
-
-    private boolean checkNullability(CaseInsensitiveStringMap options) {
-      boolean sparkCheckNullability = Boolean.parseBoolean(SparkUtils.getSparkSession().conf()
-          .get("spark.sql.iceberg.check-nullability", "true"));
-      boolean dataFrameCheckNullability = options.getBoolean("check-nullability", true);
-      return sparkCheckNullability && dataFrameCheckNullability;
-    }
-  }
-}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
similarity index 77%
rename from spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java
rename to spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
index b3fef6d..4e04749 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
@@ -28,6 +28,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -56,7 +57,6 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.SparkAvroReader;
 import org.apache.iceberg.spark.data.SparkOrcReader;
@@ -76,10 +76,7 @@ import org.apache.spark.sql.connector.read.PartitionReader;
 import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.read.Statistics;
-import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
-import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.connector.read.SupportsReportStatistics;
-import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
@@ -91,14 +88,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.apache.spark.unsafe.types.UTF8String;
 import scala.collection.JavaConverters;
 
-public class IcebergBatchScan implements Scan,
-    Batch,
-    SupportsPushDownFilters,
-    SupportsPushDownRequiredColumns,
-    SupportsReportStatistics {
-  private static final Filter[] NO_FILTERS = new Filter[0];
+class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
 
   private final Table table;
+  private final boolean caseSensitive;
+  private final Schema expectedSchema;
+  private final List<Expression> filterExpressions;
   private final Long snapshotId;
   private final Long asOfTimestamp;
   private final Long splitSize;
@@ -106,18 +101,16 @@ public class IcebergBatchScan implements Scan,
   private final Long splitOpenFileCost;
   private final FileIO fileIo;
   private final EncryptionManager encryptionManager;
-  private final boolean caseSensitive;
-  private StructType requestedSchema = null;
-  private List<Expression> filterExpressions = null;
-  private Filter[] pushedFilters = NO_FILTERS;
 
   // lazy variables
-  private Schema schema = null;
-  private StructType type = null; // cached because Spark accesses it multiple times
   private List<CombinedScanTask> tasks = null; // lazy cache of tasks
 
-  public IcebergBatchScan(Table table, Boolean caseSensitive, CaseInsensitiveStringMap options) {
+  SparkBatchScan(Table table, boolean caseSensitive, Schema expectedSchema, List<Expression> filters,
+                 CaseInsensitiveStringMap options) {
     this.table = table;
+    this.caseSensitive = caseSensitive;
+    this.expectedSchema = expectedSchema;
+    this.filterExpressions = filters;
     this.snapshotId = options.containsKey("snapshot-id") ? options.getLong("snapshot-id", 0) : null;
     this.asOfTimestamp = options.containsKey("as-of-timestamp") ? options.getLong("as-of-timestamp", 0) : null;
 
@@ -134,22 +127,53 @@ public class IcebergBatchScan implements Scan,
     this.splitOpenFileCost = options.containsKey("file-open-cost") ? options.getLong("file-open-cost",
         TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT) : null;
 
-    this.schema = table.schema();
     this.fileIo = table.io();
     this.encryptionManager = table.encryption();
-    this.caseSensitive = caseSensitive;
   }
 
-  public IcebergBatchScan(Table table, CaseInsensitiveStringMap options) {
-    this(table, true, options);
+  @Override
+  public Batch toBatch() {
+    return this;
   }
 
-  public IcebergBatchScan(Table table, CaseInsensitiveStringMap options, StructType requestedSchema) {
-    this(table, true, options);
+  @Override
+  public StructType readSchema() {
+    return SparkSchemaUtil.convert(expectedSchema);
+  }
 
-    if (requestedSchema != null) {
-      pruneColumns(requestedSchema);
+  @Override
+  public InputPartition[] planInputPartitions() {
+    String tableSchemaString = SchemaParser.toJson(table.schema());
+    String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+    List<CombinedScanTask> scanTasks = tasks();
+    InputPartition[] readTasks = new InputPartition[scanTasks.size()];
+    for (int i = 0; i < scanTasks.size(); i++) {
+      readTasks[i] = new ReadTask(
+          scanTasks.get(i), tableSchemaString, expectedSchemaString, fileIo, encryptionManager, caseSensitive);
     }
+
+    return readTasks;
+  }
+
+  @Override
+  public ReaderFactory createReaderFactory() {
+    return new ReaderFactory();
+  }
+
+  @Override
+  public Statistics estimateStatistics() {
+    long sizeInBytes = 0L;
+    long numRows = 0L;
+
+    for (CombinedScanTask task : tasks()) {
+      for (FileScanTask file : task.files()) {
+        sizeInBytes += file.length();
+        numRows += file.file().recordCount();
+      }
+    }
+
+    return new Stats(sizeInBytes, numRows);
   }
 
   private List<CombinedScanTask> tasks() {
@@ -157,7 +181,7 @@ public class IcebergBatchScan implements Scan,
       TableScan scan = table
           .newScan()
           .caseSensitive(caseSensitive)
-          .project(lazySchema());
+          .project(expectedSchema);
 
       if (snapshotId != null) {
         scan = scan.useSnapshot(snapshotId);
@@ -196,301 +220,96 @@ public class IcebergBatchScan implements Scan,
   }
 
   @Override
-  public Filter[] pushFilters(Filter[] filters) {
-    this.tasks = null; // invalidate cached tasks, if present
-
-    List<Expression> expressions = Lists.newArrayListWithExpectedSize(filters.length);
-    List<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length);
-
-    for (Filter filter : filters) {
-      Expression expr = SparkFilters.convert(filter);
-      if (expr != null) {
-        expressions.add(expr);
-        pushed.add(filter);
-      }
-    }
-
-    this.filterExpressions = expressions;
-    this.pushedFilters = pushed.toArray(new Filter[0]);
-
-    // invalidate the schema that will be projected
-    this.schema = null;
-    this.type = null;
-
-    // Spark doesn't support residuals per task, so return all filters
-    // to get Spark to handle record-level filtering
-    return filters;
-  }
-
-  @Override
-  public Batch toBatch() {
-    return this;
-  }
-
-  @Override
-  public Filter[] pushedFilters() {
-    return pushedFilters;
-  }
-
-  @Override
-  public void pruneColumns(StructType newRequestedSchema) {
-    this.requestedSchema = newRequestedSchema;
-
-    // invalidate the schema that will be projected
-    this.schema = null;
-    this.type = null;
+  public String toString() {
+    return String.format(
+        "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
+        table, expectedSchema.asStruct(), filterExpressions, caseSensitive);
   }
 
-  @Override
-  public Scan build() {
-    return this;
-  }
-
-  @Override
-  public Statistics estimateStatistics() {
-    long sizeInBytes = 0L;
-    long numRows = 0L;
-
-    for (CombinedScanTask task : tasks()) {
-      for (FileScanTask file : task.files()) {
-        sizeInBytes += file.length();
-        numRows += file.file().recordCount();
+  private static class ReaderFactory implements PartitionReaderFactory {
+    @Override
+    public PartitionReader<InternalRow> createReader(InputPartition inputPartition) {
+      if (inputPartition instanceof ReadTask) {
+        return new TaskDataReader((ReadTask) inputPartition);
+      } else {
+        throw new UnsupportedOperationException("Incorrect input partition type: " + inputPartition);
       }
     }
-
-    return new Stats(sizeInBytes, numRows);
   }
 
-  public static class BatchReadInputPartition implements InputPartition, Serializable {
+  private static class ReadTask implements InputPartition, Serializable {
     private final CombinedScanTask task;
     private final String tableSchemaString;
     private final String expectedSchemaString;
-    private final FileIO fileIo;
+    private final FileIO io;
     private final EncryptionManager encryptionManager;
     private final boolean caseSensitive;
 
     private transient Schema tableSchema = null;
     private transient Schema expectedSchema = null;
 
-    BatchReadInputPartition(
-        CombinedScanTask task,
-        String tableSchemaString,
-        String expectedSchemaString,
-        FileIO fileIo,
-        EncryptionManager encryptionManager,
-        boolean caseSensitive) {
+    ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
+                            FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
       this.task = task;
       this.tableSchemaString = tableSchemaString;
       this.expectedSchemaString = expectedSchemaString;
-      this.fileIo = fileIo;
+      this.io = io;
       this.encryptionManager = encryptionManager;
       this.caseSensitive = caseSensitive;
     }
 
-    private Schema lazyTableSchema() {
-      if (tableSchema == null) {
-        this.tableSchema = SchemaParser.fromJson(tableSchemaString);
-      }
-      return tableSchema;
+    public Collection<FileScanTask> files() {
+      return task.files();
     }
 
-    private Schema lazyExpectedSchema() {
-      if (expectedSchema == null) {
-        this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
-      }
-      return expectedSchema;
-    }
-  }
-
-  @Override
-  public InputPartition[] planInputPartitions() {
-    String tableSchemaString = SchemaParser.toJson(table.schema());
-    String expectedSchemaString = SchemaParser.toJson(lazySchema());
-
-    List<CombinedScanTask> scanTasks = tasks();
-    InputPartition[] readTasks = new InputPartition[scanTasks.size()];
-    for (int i = 0; i < scanTasks.size(); i++) {
-      readTasks[i] = new BatchReadInputPartition(scanTasks.get(i), tableSchemaString, expectedSchemaString, fileIo,
-          encryptionManager, caseSensitive);
-    }
-
-    return readTasks;
-  }
-
-  @Override
-  public IcebergRowReaderFactory createReaderFactory() {
-    return new IcebergRowReaderFactory();
-  }
-
-  private Schema lazySchema() {
-    if (schema == null) {
-      if (requestedSchema != null) {
-        this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema);
-      } else {
-        this.schema = table.schema();
-      }
+    public FileIO io() {
+      return io;
     }
-    return schema;
-  }
-
-  private StructType lazyType() {
-    if (type == null) {
-      this.type = SparkSchemaUtil.convert(lazySchema());
-    }
-    return type;
-  }
-
-  @Override
-  public StructType readSchema() {
-    return lazyType();
-  }
-
-
-  public static class PartitionRowConverter implements Function<StructLike, InternalRow> {
-    private final DataType[] types;
-    private final int[] positions;
-    private final Class<?>[] javaTypes;
-    private final GenericInternalRow reusedRow;
-
-    PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
-      StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
-      StructField[] fields = partitionType.fields();
-
-      this.types = new DataType[fields.length];
-      this.positions = new int[types.length];
-      this.javaTypes = new Class<?>[types.length];
-      this.reusedRow = new GenericInternalRow(types.length);
-
-      List<PartitionField> partitionFields = spec.fields();
-      for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
-        this.types[rowIndex] = fields[rowIndex].dataType();
 
-        int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
-        for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
-          PartitionField field = spec.fields().get(specIndex);
-          if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
-            positions[rowIndex] = specIndex;
-            javaTypes[rowIndex] = spec.javaClasses()[specIndex];
-            break;
-          }
-        }
-      }
+    public EncryptionManager encryptionManager() {
+      return encryptionManager;
     }
 
-    @Override
-    public InternalRow apply(StructLike tuple) {
-      for (int i = 0; i < types.length; i += 1) {
-        Object value = tuple.get(positions[i], javaTypes[i]);
-        if (value != null) {
-          reusedRow.update(i, convert(value, types[i]));
-        } else {
-          reusedRow.setNullAt(i);
-        }
-      }
-
-      return reusedRow;
+    public boolean isCaseSensitive() {
+      return caseSensitive;
     }
 
-    /**
-     * Converts the objects into instances used by Spark's InternalRow.
-     *
-     * @param value a data value
-     * @param type  the Spark data type
-     * @return the value converted to the representation expected by Spark's InternalRow.
-     */
-    private static Object convert(Object value, DataType type) {
-      if (type instanceof StringType) {
-        return UTF8String.fromString(value.toString());
-      } else if (type instanceof BinaryType) {
-        return ByteBuffers.toByteArray((ByteBuffer) value);
-      } else if (type instanceof DecimalType) {
-        return Decimal.fromDecimal(value);
+    private Schema tableSchema() {
+      if (tableSchema == null) {
+        this.tableSchema = SchemaParser.fromJson(tableSchemaString);
       }
-      return value;
+      return tableSchema;
     }
-  }
-
-  public static class StructLikeInternalRow implements StructLike {
-    private final DataType[] types;
-    private InternalRow row = null;
 
-    StructLikeInternalRow(StructType struct) {
-      this.types = new DataType[struct.size()];
-      StructField[] fields = struct.fields();
-      for (int i = 0; i < fields.length; i += 1) {
-        types[i] = fields[i].dataType();
+    private Schema expectedSchema() {
+      if (expectedSchema == null) {
+        this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
       }
-    }
-
-    public StructLikeInternalRow setRow(InternalRow row) {
-      this.row = row;
-      return this;
-    }
-
-    @Override
-    public int size() {
-      return types.length;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <T> T get(int pos, Class<T> javaClass) {
-      return javaClass.cast(row.get(pos, types[pos]));
-    }
-
-    @Override
-    public <T> void set(int pos, T value) {
-      throw new UnsupportedOperationException("Not implemented: set");
-    }
-  }
-
-
-  private static class IcebergRowReaderFactory implements PartitionReaderFactory {
-    IcebergRowReaderFactory() {
-    }
-
-    @Override
-    public PartitionReader<InternalRow> createReader(InputPartition inputPartition) {
-      return new TaskDataReader(inputPartition);
+      return expectedSchema;
     }
   }
 
-  public static class TaskDataReader implements PartitionReader<InternalRow> {
+  private static class TaskDataReader implements PartitionReader<InternalRow> {
     // for some reason, the apply method can't be called from Java without reflection
     private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
         .impl(UnsafeProjection.class, InternalRow.class)
         .build();
 
+    private final ReadTask split;
     private final Iterator<FileScanTask> tasks;
-    private final Schema tableSchema;
-    private final Schema expectedSchema;
-    private final FileIO fileIo;
     private final Map<String, InputFile> inputFiles;
-    private final boolean caseSensitive;
 
     private Iterator<InternalRow> currentIterator = null;
     private Closeable currentCloseable = null;
     private InternalRow current = null;
 
-    TaskDataReader(InputPartition inputPartition) {
-      BatchReadInputPartition batchReadInputPartition = (BatchReadInputPartition) inputPartition;
+    TaskDataReader(ReadTask task) {
+      this.split = task;
+      this.tasks = task.files().iterator();
+      this.inputFiles = batchDecrypt(task.files());
 
-      this.fileIo = batchReadInputPartition.fileIo;
-      this.tableSchema = batchReadInputPartition.lazyTableSchema();
-      this.expectedSchema = batchReadInputPartition.lazyExpectedSchema();
-      this.tasks = batchReadInputPartition.task.files().iterator();
-      Iterable<InputFile> decryptedFiles = batchReadInputPartition.encryptionManager.decrypt(
-          Iterables.transform(batchReadInputPartition.task.files(),
-              fileScanTask ->
-                  EncryptedFiles.encryptedInput(
-                      this.fileIo.newInputFile(fileScanTask.file().path().toString()),
-                      fileScanTask.file().keyMetadata())));
-      ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
-      decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
-      this.inputFiles = inputFileBuilder.build();
-      // open last because the schemas and fileIo must be set
+      // open last because the split must be set
       this.currentIterator = open(tasks.next());
-      this.caseSensitive = batchReadInputPartition.caseSensitive;
     }
 
     @Override
@@ -526,17 +345,31 @@ public class IcebergBatchScan implements Scan,
       }
     }
 
+    private Map<String, InputFile> batchDecrypt(Collection<FileScanTask> files) {
+      Iterable<InputFile> decryptedFiles = split.encryptionManager().decrypt(
+          Iterables.transform(files,
+              fileScanTask ->
+                  EncryptedFiles.encryptedInput(
+                      split.io().newInputFile(fileScanTask.file().path().toString()),
+                      fileScanTask.file().keyMetadata())));
+
+      ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
+      decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
+      return inputFileBuilder.build();
+    }
+
     private Iterator<InternalRow> open(FileScanTask task) {
       DataFile file = task.file();
 
       // schema or rows returned by readers
-      Schema finalSchema = expectedSchema;
+      Schema finalSchema = split.expectedSchema();
       PartitionSpec spec = task.spec();
       Set<Integer> idColumns = spec.identitySourceIds();
 
       // schema needed for the projection and filtering
       StructType sparkType = SparkSchemaUtil.convert(finalSchema);
-      Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive);
+      Schema requiredSchema = SparkSchemaUtil.prune(
+          split.tableSchema(), sparkType, task.residual(), split.isCaseSensitive());
       boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
       boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();
 
@@ -647,7 +480,7 @@ public class IcebergBatchScan implements Scan,
           .split(task.start(), task.length())
           .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
           .filter(task.residual())
-          .caseSensitive(caseSensitive)
+          .caseSensitive(split.isCaseSensitive())
           .build();
     }
 
@@ -658,17 +491,114 @@ public class IcebergBatchScan implements Scan,
           .schema(readSchema)
           .split(task.start(), task.length())
           .createReaderFunc(SparkOrcReader::new)
-          .caseSensitive(caseSensitive)
+          .caseSensitive(split.isCaseSensitive())
           .build();
     }
 
     private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
-      StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
+      StructInternalRow row = new StructInternalRow(split.tableSchema().asStruct());
       CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
           task.asDataTask().rows(), row::setStruct);
       return CloseableIterable.transform(
-          asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
+          asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, split.tableSchema()))::invoke);
+    }
+  }
+
+  private static class PartitionRowConverter implements Function<StructLike, InternalRow> {
+    private final DataType[] types;
+    private final int[] positions;
+    private final Class<?>[] javaTypes;
+    private final GenericInternalRow reusedRow;
+
+    PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
+      StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
+      StructField[] fields = partitionType.fields();
+
+      this.types = new DataType[fields.length];
+      this.positions = new int[types.length];
+      this.javaTypes = new Class<?>[types.length];
+      this.reusedRow = new GenericInternalRow(types.length);
+
+      List<PartitionField> partitionFields = spec.fields();
+      for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
+        this.types[rowIndex] = fields[rowIndex].dataType();
+
+        int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
+        for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
+          PartitionField field = spec.fields().get(specIndex);
+          if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
+            positions[rowIndex] = specIndex;
+            javaTypes[rowIndex] = spec.javaClasses()[specIndex];
+            break;
+          }
+        }
+      }
+    }
+
+    @Override
+    public InternalRow apply(StructLike tuple) {
+      for (int i = 0; i < types.length; i += 1) {
+        Object value = tuple.get(positions[i], javaTypes[i]);
+        if (value != null) {
+          reusedRow.update(i, convert(value, types[i]));
+        } else {
+          reusedRow.setNullAt(i);
+        }
+      }
+
+      return reusedRow;
+    }
+
+    /**
+     * Converts the objects into instances used by Spark's InternalRow.
+     *
+     * @param value a data value
+     * @param type  the Spark data type
+     * @return the value converted to the representation expected by Spark's InternalRow.
+     */
+    private static Object convert(Object value, DataType type) {
+      if (type instanceof StringType) {
+        return UTF8String.fromString(value.toString());
+      } else if (type instanceof BinaryType) {
+        return ByteBuffers.toByteArray((ByteBuffer) value);
+      } else if (type instanceof DecimalType) {
+        return Decimal.fromDecimal(value);
+      }
+      return value;
     }
   }
 
+  private static class StructLikeInternalRow implements StructLike {
+    private final DataType[] types;
+    private InternalRow row = null;
+
+    StructLikeInternalRow(StructType struct) {
+      this.types = new DataType[struct.size()];
+      StructField[] fields = struct.fields();
+      for (int i = 0; i < fields.length; i += 1) {
+        types[i] = fields[i].dataType();
+      }
+    }
+
+    public StructLikeInternalRow setRow(InternalRow row) {
+      this.row = row;
+      return this;
+    }
+
+    @Override
+    public int size() {
+      return types.length;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> T get(int pos, Class<T> javaClass) {
+      return javaClass.cast(row.get(pos, types[pos]));
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+      throw new UnsupportedOperationException("Not implemented: set");
+    }
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
similarity index 90%
rename from spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchWriter.java
rename to spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
index dfdf6b1..b0c1ded 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
@@ -48,7 +48,7 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
@@ -59,11 +59,11 @@ import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.TableCapability;
 import org.apache.spark.sql.connector.write.BatchWrite;
 import org.apache.spark.sql.connector.write.DataWriter;
 import org.apache.spark.sql.connector.write.DataWriterFactory;
 import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,31 +81,30 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
 
-class IcebergBatchWriter implements BatchWrite {
-  private static final Logger LOG = LoggerFactory.getLogger(IcebergBatchWriter.class);
+class SparkBatchWrite implements BatchWrite {
+  private static final Logger LOG = LoggerFactory.getLogger(SparkBatchWrite.class);
 
   private final Table table;
   private final FileFormat format;
   private final FileIO fileIo;
   private final EncryptionManager encryptionManager;
-  private final TableCapability writeBehavior;
+  private final boolean overwriteDynamic;
+  private final boolean overwriteByFilter;
+  private final Expression overwriteExpr;
   private final String applicationId;
   private final String wapId;
   private final long targetFileSize;
   private final Schema dsSchema;
 
-  IcebergBatchWriter(
-      Table table,
-      CaseInsensitiveStringMap options,
-      TableCapability writeBehavior,
-      String applicationId,
-      String wapId,
-      Schema dsSchema) {
+  SparkBatchWrite(Table table, CaseInsensitiveStringMap options, boolean overwriteDynamic, boolean overwriteByFilter,
+                  Expression overwriteExpr, String applicationId, String wapId, Schema dsSchema) {
     this.table = table;
     this.format = getFileFormat(table.properties(), options);
     this.fileIo = table.io();
     this.encryptionManager = table.encryption();
-    this.writeBehavior = writeBehavior;
+    this.overwriteDynamic = overwriteDynamic;
+    this.overwriteByFilter = overwriteByFilter;
+    this.overwriteExpr = overwriteExpr;
     this.applicationId = applicationId;
     this.wapId = wapId;
     this.dsSchema = dsSchema;
@@ -128,7 +127,7 @@ class IcebergBatchWriter implements BatchWrite {
   }
 
   @Override
-  public DataWriterFactory createBatchWriterFactory() {
+  public WriterFactory createBatchWriterFactory() {
     return new WriterFactory(
         table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager, targetFileSize,
         dsSchema);
@@ -136,14 +135,12 @@ class IcebergBatchWriter implements BatchWrite {
 
   @Override
   public void commit(WriterCommitMessage[] messages) {
-    if (writeBehavior.equals(TableCapability.OVERWRITE_DYNAMIC)) {
+    if (overwriteDynamic) {
       replacePartitions(messages);
-    } else if (writeBehavior.equals(TableCapability.BATCH_WRITE)) {
-      append(messages);
-    } else if (writeBehavior.equals(TableCapability.TRUNCATE)) {
+    } else if (overwriteByFilter) {
       overwrite(messages);
     } else {
-      throw new IllegalArgumentException("Iceberg doen't support write behavior " + writeBehavior + " for now");
+      append(messages);
     }
   }
 
@@ -192,7 +189,7 @@ class IcebergBatchWriter implements BatchWrite {
 
   private void overwrite(WriterCommitMessage[] messages) {
     OverwriteFiles overwriteFiles = table.newOverwrite();
-    overwriteFiles.overwriteByRowFilter(Expressions.alwaysTrue());
+    overwriteFiles.overwriteByRowFilter(overwriteExpr);
 
     int numFiles = 0;
     for (DataFile file : files(messages)) {
@@ -200,7 +197,7 @@ class IcebergBatchWriter implements BatchWrite {
       overwriteFiles.addFile(file);
     }
 
-    commitOperation(overwriteFiles, numFiles, "overwrite by filter or truncate");
+    commitOperation(overwriteFiles, numFiles, "overwrite by filter");
   }
 
   @Override
@@ -266,7 +263,7 @@ class IcebergBatchWriter implements BatchWrite {
     }
   }
 
-  protected static class WriterFactory implements DataWriterFactory {
+  private static class WriterFactory implements DataWriterFactory, StreamingDataWriterFactory {
     private final PartitionSpec spec;
     private final FileFormat format;
     private final LocationProvider locations;
@@ -276,15 +273,9 @@ class IcebergBatchWriter implements BatchWrite {
     private final long targetFileSize;
     private final Schema dsSchema;
 
-    WriterFactory(
-        PartitionSpec spec,
-        FileFormat format,
-        LocationProvider locations,
-        Map<String, String> properties,
-        FileIO fileIo,
-        EncryptionManager encryptionManager,
-        long targetFileSize,
-        Schema dsSchema) {
+    WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
+                  Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
+                  long targetFileSize, Schema dsSchema) {
       this.spec = spec;
       this.format = format;
       this.locations = locations;
@@ -295,20 +286,13 @@ class IcebergBatchWriter implements BatchWrite {
       this.dsSchema = dsSchema;
     }
 
-    public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, epochId);
-      AppenderFactory<InternalRow> appenderFactory = new SparkAppenderFactory();
-      if (spec.fields().isEmpty()) {
-        return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
-      } else {
-        return new PartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
-      }
+    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
+      return createWriter(partitionId, taskId, 0);
     }
 
-    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
-      OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, 0);
+    public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
+      OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, epochId);
       AppenderFactory<InternalRow> appenderFactory = new SparkAppenderFactory();
-
       if (spec.fields().isEmpty()) {
         return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
       } else {
@@ -406,13 +390,8 @@ class IcebergBatchWriter implements BatchWrite {
     private EncryptedOutputFile currentFile = null;
     private long currentRows = 0;
 
-    BaseWriter(
-        PartitionSpec spec,
-        FileFormat format,
-        AppenderFactory<InternalRow> appenderFactory,
-        WriterFactory.OutputFileFactory fileFactory,
-        FileIO fileIo,
-        long targetFileSize) {
+    BaseWriter(PartitionSpec spec, FileFormat format, AppenderFactory<InternalRow> appenderFactory,
+               WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
       this.spec = spec;
       this.format = format;
       this.appenderFactory = appenderFactory;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
new file mode 100644
index 0000000..cb932ff
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.spark.SparkFilters;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
+import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns {
+  private static final Filter[] NO_FILTERS = new Filter[0];
+
+  private final SparkSession spark;
+  private final Table table;
+  private final CaseInsensitiveStringMap options;
+
+  private Schema schema;
+  private boolean caseSensitive;
+  private List<Expression> filterExpressions = null;
+  private Filter[] pushedFilters = NO_FILTERS;
+
+  SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
+    this.spark = spark;
+    this.table = table;
+    this.options = options;
+    this.schema = table.schema();
+    this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+  }
+
+  public SparkScanBuilder caseSensitive(boolean isCaseSensitive) {
+    this.caseSensitive = isCaseSensitive;
+    return this;
+  }
+
+  @Override
+  public Filter[] pushFilters(Filter[] filters) {
+    List<Expression> expressions = Lists.newArrayListWithExpectedSize(filters.length);
+    List<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length);
+
+    for (Filter filter : filters) {
+      Expression expr = SparkFilters.convert(filter);
+      if (expr != null) {
+        expressions.add(expr);
+        pushed.add(filter);
+      }
+    }
+
+    this.filterExpressions = expressions;
+    this.pushedFilters = pushed.toArray(new Filter[0]);
+
+    // Spark doesn't support residuals per task, so return all filters
+    // to get Spark to handle record-level filtering
+    return filters;
+  }
+
+  @Override
+  public Filter[] pushedFilters() {
+    return pushedFilters;
+  }
+
+  @Override
+  public void pruneColumns(StructType requestedSchema) {
+    this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema);
+  }
+
+  public StructType readSchema() {
+    return SparkSchemaUtil.convert(schema);
+  }
+
+  @Override
+  public Scan build() {
+    return new SparkBatchScan(table, caseSensitive, schema, filterExpressions, options);
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergStreamingWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkStreamingWrite.java
similarity index 64%
rename from spark/src/main/java/org/apache/iceberg/spark/source/IcebergStreamingWriter.java
rename to spark/src/main/java/org/apache/iceberg/spark/source/SparkStreamingWrite.java
index e1d0dfb..5781534 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergStreamingWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkStreamingWrite.java
@@ -35,7 +35,6 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.TableCapability;
 import org.apache.spark.sql.connector.write.DataWriter;
 import org.apache.spark.sql.connector.write.WriterCommitMessage;
 import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
@@ -44,58 +43,31 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+public class SparkStreamingWrite extends SparkBatchWrite implements StreamingWrite {
 
-public class IcebergStreamingWriter extends IcebergBatchWriter implements StreamingWrite {
-
-  private static final Logger LOG = LoggerFactory.getLogger(IcebergStreamingWriter.class);
+  private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingWrite.class);
   private static final String QUERY_ID_PROPERTY = "spark.sql.streaming.queryId";
   private static final String EPOCH_ID_PROPERTY = "spark.sql.streaming.epochId";
 
+  private final boolean truncateBatches;
   private final String queryId;
-  private final TableCapability writeBehavior;
-  private final Table table;
-  private final long targetFileSize;
-  private final FileFormat format;
-  private final Schema dsSchema;
 
-  IcebergStreamingWriter(Table table, CaseInsensitiveStringMap options, String queryId, TableCapability writeBehavior,
-      String applicationId, String wapId, Schema dsSchema) {
-    super(table, options, writeBehavior, applicationId, wapId, dsSchema);
+  SparkStreamingWrite(Table table, CaseInsensitiveStringMap options, boolean truncateBatches, String queryId,
+                      String applicationId, String wapId, Schema dsSchema) {
+    super(table, options, false, truncateBatches, Expressions.alwaysTrue(), applicationId, wapId, dsSchema);
+    this.truncateBatches = truncateBatches;
     this.queryId = queryId;
-    this.writeBehavior = writeBehavior;
-    this.table = table;
-    this.format = getFileFormat(table.properties(), options);
-    long tableTargetFileSize = PropertyUtil.propertyAsLong(
-            table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize);
-    this.dsSchema = dsSchema;
   }
 
   @Override
   public StreamingDataWriterFactory createStreamingWriterFactory() {
-    return new StreamingWriterFactory(table.spec(), format, table.locationProvider(),
-            table.properties(), table.io(), table.encryption(), targetFileSize, dsSchema);
-  }
-
-  private static class StreamingWriterFactory extends WriterFactory implements StreamingDataWriterFactory {
-
-    StreamingWriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
-                  Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
-                  long targetFileSize, Schema dsSchema) {
-      super(spec, format, locations, properties, fileIo, encryptionManager, targetFileSize, dsSchema);
-    }
-
-    @Override
-    public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
-      return  super.createWriter(partitionId, taskId, epochId);
-    }
+    // the writer factory works for both batch and streaming
+    return createBatchWriterFactory();
   }
 
   @Override
   public void commit(long epochId, WriterCommitMessage[] messages) {
-    LOG.info("Committing epoch {} for query {} in {} mode", epochId, queryId, writeBehavior);
+    LOG.info("Committing epoch {} for query {} in {} mode", epochId, queryId, truncateBatches ? "complete" : "append");
 
     table().refresh();
     Long lastCommittedEpochId = getLastCommittedEpochId();
@@ -104,7 +76,7 @@ public class IcebergStreamingWriter extends IcebergBatchWriter implements Stream
       return;
     }
 
-    if (writeBehavior.equals(TableCapability.TRUNCATE)) {
+    if (truncateBatches) {
       OverwriteFiles overwriteFiles = table().newOverwrite();
       overwriteFiles.overwriteByRowFilter(Expressions.alwaysTrue());
       int numFiles = 0;
@@ -113,7 +85,7 @@ public class IcebergStreamingWriter extends IcebergBatchWriter implements Stream
         numFiles++;
       }
       commit(overwriteFiles, epochId, numFiles, "streaming complete overwrite");
-    } else if (writeBehavior.equals(TableCapability.STREAMING_WRITE)) {
+    } else {
       AppendFiles append = table().newFastAppend();
       int numFiles = 0;
       for (DataFile file : files(messages)) {
@@ -121,8 +93,6 @@ public class IcebergStreamingWriter extends IcebergBatchWriter implements Stream
         numFiles++;
       }
       commit(append, epochId, numFiles, "streaming append");
-    } else {
-      throw new IllegalArgumentException("Iceberg doen't support write behavior " + writeBehavior + " for now");
     }
   }
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
new file mode 100644
index 0000000..f85aac2
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkTable implements org.apache.spark.sql.connector.catalog.Table, SupportsRead, SupportsWrite {
+
+  private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
+      TableCapability.BATCH_READ,
+      TableCapability.BATCH_WRITE,
+      TableCapability.STREAMING_WRITE,
+      TableCapability.OVERWRITE_BY_FILTER,
+      TableCapability.OVERWRITE_DYNAMIC);
+
+  private final Table icebergTable;
+  private final StructType requestedSchema;
+  private StructType lazyTableSchema = null;
+  private SparkSession lazySpark = null;
+
+  SparkTable(Table icebergTable, StructType requestedSchema) {
+    this.icebergTable = icebergTable;
+    this.requestedSchema = requestedSchema;
+
+    if (requestedSchema != null) {
+      // convert the requested schema to throw an exception if any requested fields are unknown
+      SparkSchemaUtil.convert(icebergTable.schema(), requestedSchema);
+    }
+  }
+
+  private SparkSession sparkSession() {
+    if (lazySpark == null) {
+      this.lazySpark = SparkSession.active();
+    }
+
+    return lazySpark;
+  }
+
+  @Override
+  public String name() {
+    return icebergTable.toString();
+  }
+
+  @Override
+  public StructType schema() {
+    if (lazyTableSchema == null) {
+      if (requestedSchema != null) {
+        this.lazyTableSchema = SparkSchemaUtil.convert(SparkSchemaUtil.prune(icebergTable.schema(), requestedSchema));
+      } else {
+        this.lazyTableSchema = SparkSchemaUtil.convert(icebergTable.schema());
+      }
+    }
+
+    return lazyTableSchema;
+  }
+
+  @Override
+  public Transform[] partitioning() {
+    return SparkUtil.toTransforms(icebergTable.spec());
+  }
+
+  @Override
+  public Map<String, String> properties() {
+    return icebergTable.properties();
+  }
+
+  @Override
+  public Set<TableCapability> capabilities() {
+    return CAPABILITIES;
+  }
+
+  @Override
+  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+    SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
+    if (requestedSchema != null) {
+      scanBuilder.pruneColumns(requestedSchema);
+    }
+
+    return scanBuilder;
+  }
+
+  @Override
+  public WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options) {
+    return new SparkWriteBuilder(sparkSession(), icebergTable, options);
+  }
+
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
new file mode 100644
index 0000000..3f1b48b
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.spark.SparkFilters;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.transforms.UnknownTransform;
+import org.apache.iceberg.types.CheckCompatibility;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite;
+import org.apache.spark.sql.connector.write.SupportsOverwrite;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, SupportsOverwrite {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final CaseInsensitiveStringMap options;
+  private final String overwriteMode;
+  private boolean overwriteDynamic = false;
+  private boolean overwriteByFilter = false;
+  private Expression overwriteExpr = null;
+  private String writeQueryId = null;
+  private StructType writeSchema = null;
+
+  SparkWriteBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
+    this.spark = spark;
+    this.table = table;
+    this.options = options;
+    this.overwriteMode = options.containsKey("overwrite-mode") ?
+        options.get("overwrite-mode").toLowerCase(Locale.ROOT) : null;
+  }
+
+  @Override
+  public WriteBuilder withQueryId(String queryId) {
+    this.writeQueryId = queryId;
+    return this;
+  }
+
+  @Override
+  public WriteBuilder withInputDataSchema(StructType schemaInput) {
+    this.writeSchema = schemaInput;
+    return this;
+  }
+
+  @Override
+  public WriteBuilder overwriteDynamicPartitions() {
+    Preconditions.checkState(!overwriteByFilter, "Cannot overwrite dynamically and by filter: %s", overwriteExpr);
+    this.overwriteDynamic = true;
+    return this;
+  }
+
+  @Override
+  public WriteBuilder overwrite(Filter[] filters) {
+    this.overwriteExpr = SparkFilters.convert(filters);
+    if (overwriteExpr == Expressions.alwaysTrue() && "dynamic".equals(overwriteMode)) {
+      // use the write option to override truncating the table. use dynamic overwrite instead.
+      this.overwriteDynamic = true;
+    } else {
+      Preconditions.checkState(!overwriteDynamic, "Cannot overwrite dynamically and by filter: %s", overwriteExpr);
+      this.overwriteByFilter = true;
+    }
+    return this;
+  }
+
+  @Override
+  public BatchWrite buildForBatch() {
+    // Validate
+    Schema dsSchema = SparkSchemaUtil.convert(table.schema(), writeSchema);
+    validateWriteSchema(table.schema(), dsSchema, checkNullability(options));
+    validatePartitionTransforms(table.spec());
+
+    // Get application id
+    String appId = spark.sparkContext().applicationId();
+
+    // Get write-audit-publish id
+    String wapId = spark.conf().get("spark.wap.id", null);
+
+    return new SparkBatchWrite(table,
+        options, overwriteDynamic, overwriteByFilter, overwriteExpr, appId, wapId, dsSchema);
+  }
+
+  @Override
+  public StreamingWrite buildForStreaming() {
+    // Validate
+    Schema dsSchema = SparkSchemaUtil.convert(table.schema(), writeSchema);
+    validateWriteSchema(table.schema(), dsSchema, checkNullability(options));
+    validatePartitionTransforms(table.spec());
+
+    // Change to streaming write if it is just append
+    Preconditions.checkState(!overwriteDynamic,
+        "Unsupported streaming operation: dynamic partition overwrite");
+    Preconditions.checkState(!overwriteByFilter || overwriteExpr == Expressions.alwaysTrue(),
+        "Unsupported streaming operation: overwrite by filter: %s", overwriteExpr);
+
+    // Get application id
+    String appId = spark.sparkContext().applicationId();
+    String wapId = spark.conf().get("spark.wap.id", null);
+
+    return new SparkStreamingWrite(table, options, overwriteByFilter, writeQueryId, appId, wapId, dsSchema);
+  }
+
+  private void validateWriteSchema(Schema tableSchema, Schema dsSchema, Boolean checkNullability) {
+    List<String> errors;
+    if (checkNullability) {
+      errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
+    } else {
+      errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema);
+    }
+    if (!errors.isEmpty()) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Cannot write incompatible dataset to table with schema:\n")
+        .append(tableSchema)
+        .append("\nProblems:");
+      for (String error : errors) {
+        sb.append("\n* ").append(error);
+      }
+      throw new IllegalArgumentException(sb.toString());
+    }
+  }
+
+  private void validatePartitionTransforms(PartitionSpec spec) {
+    if (spec.fields().stream().anyMatch(field -> field.transform() instanceof UnknownTransform)) {
+      String unsupported = spec.fields().stream()
+          .map(PartitionField::transform)
+          .filter(transform -> transform instanceof UnknownTransform)
+          .map(org.apache.iceberg.transforms.Transform::toString)
+          .collect(Collectors.joining(", "));
+
+      throw new UnsupportedOperationException(
+        String.format("Cannot write using unsupported transforms: %s", unsupported));
+    }
+  }
+
+  private boolean checkNullability(CaseInsensitiveStringMap options) {
+    boolean sparkCheckNullability = Boolean.parseBoolean(spark.conf()
+        .get("spark.sql.iceberg.check-nullability", "true"));
+    boolean dataFrameCheckNullability = options.getBoolean("check-nullability", true);
+    return sparkCheckNullability && dataFrameCheckNullability;
+  }
+}
diff --git a/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 04e5f32..01a6c4e 100644
--- a/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -17,4 +17,4 @@
 # under the License.
 #
 
-org.apache.iceberg.spark.source.IcebergTableProvider
+org.apache.iceberg.spark.source.IcebergSource
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
index 9ff583d..85d2627 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
@@ -53,8 +53,8 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -221,9 +221,10 @@ public class TestDataFrameWrites extends AvroDataTest {
     return spark.internalCreateDataFrame(JavaRDD.toRDD(rdd), convert(schema), false);
   }
 
-  // This fails due to SPARK-28730
-  @Ignore
+  @Test
   public void testNullableWithWriteOption() throws IOException {
+    Assume.assumeTrue("Spark 3.0 rejects writing nulls to a required column", spark.version().startsWith("2"));
+
     File location = new File(temp.newFolder("parquet"), "test");
     String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location.toString());
     String targetPath = String.format("%s/nullable_poc/targetFolder/", location.toString());
@@ -266,9 +267,10 @@ public class TestDataFrameWrites extends AvroDataTest {
 
   }
 
-  // This fails due to SPARK-28730
-  @Ignore
+  @Test
   public void testNullableWithSparkSqlOption() throws IOException {
+    Assume.assumeTrue("Spark 3.0 rejects writing nulls to a required column", spark.version().startsWith("2"));
+
     File location = new File(temp.newFolder("parquet"), "test");
     String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location.toString());
     String targetPath = String.format("%s/nullable_poc/targetFolder/", location.toString());
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index 991e01d..ead9a27 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -51,8 +51,10 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.api.java.UDF1;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.connector.read.Batch;
 import org.apache.spark.sql.connector.read.InputPartition;
 import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
 import org.apache.spark.sql.sources.And;
 import org.apache.spark.sql.sources.EqualTo;
@@ -212,10 +214,11 @@ public class TestFilteredScan {
     CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of(
         "path", unpartitioned.toString())
     );
-    IcebergBatchScan scan = new IcebergBatchScan(TABLES.load(options.get("path")), options);
+    SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
 
     for (int i = 0; i < 10; i += 1) {
-      pushFilters(scan, EqualTo.apply("id", i));
+      pushFilters(builder, EqualTo.apply("id", i));
+      Batch scan = builder.build().toBatch();
 
       InputPartition[] partitions = scan.planInputPartitions();
       Assert.assertEquals("Should only create one task for a small file", 1, partitions.length);
@@ -239,9 +242,11 @@ public class TestFilteredScan {
     try {
 
       for (int i = 0; i < 10; i += 1) {
-        IcebergBatchScan scan = new IcebergBatchScan(TABLES.load(options.get("path")), false, options);
+        SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options)
+            .caseSensitive(false);
 
-        pushFilters(scan, EqualTo.apply("ID", i)); // note lower(ID) == lower(id), so there must be a match
+        pushFilters(builder, EqualTo.apply("ID", i)); // note lower(ID) == lower(id), so there must be a match
+        Batch scan = builder.build().toBatch();
 
         InputPartition[] tasks = scan.planInputPartitions();
         Assert.assertEquals("Should only create one task for a small file", 1, tasks.length);
@@ -262,9 +267,10 @@ public class TestFilteredScan {
         "path", unpartitioned.toString())
     );
 
-    IcebergBatchScan scan = new IcebergBatchScan(TABLES.load(options.get("path")), options);
+    SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
 
-    pushFilters(scan, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+    pushFilters(builder, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+    Batch scan = builder.build().toBatch();
 
     InputPartition[] tasks = scan.planInputPartitions();
     Assert.assertEquals("Should only create one task for a small file", 1, tasks.length);
@@ -278,14 +284,15 @@ public class TestFilteredScan {
     Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID, "bucket4", "id");
     CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location()));
 
-    IcebergBatchScan unfiltered = new IcebergBatchScan(table, options);
+    Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options).build().toBatch();
     Assert.assertEquals("Unfiltered table should created 4 read tasks",
         4, unfiltered.planInputPartitions().length);
 
     for (int i = 0; i < 10; i += 1) {
-      IcebergBatchScan scan = new IcebergBatchScan(table, options);
+      SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
 
-      pushFilters(scan, EqualTo.apply("id", i));
+      pushFilters(builder, EqualTo.apply("id", i));
+      Batch scan = builder.build().toBatch();
 
       InputPartition[] tasks = scan.planInputPartitions();
 
@@ -302,15 +309,16 @@ public class TestFilteredScan {
   public void testDayPartitionedTimestampFilters() {
     Table table = buildPartitionedTable("partitioned_by_day", PARTITION_BY_DAY, "ts_day", "ts");
     CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location()));
-    IcebergBatchScan unfiltered = new IcebergBatchScan(table, options);
+    Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options).build().toBatch();
 
     Assert.assertEquals("Unfiltered table should created 2 read tasks",
         2, unfiltered.planInputPartitions().length);
 
     {
-      IcebergBatchScan scan = new IcebergBatchScan(table, options);
+      SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
 
-      pushFilters(scan, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+      pushFilters(builder, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+      Batch scan = builder.build().toBatch();
 
       InputPartition[] tasks = scan.planInputPartitions();
       Assert.assertEquals("Should create one task for 2017-12-21", 1, tasks.length);
@@ -320,11 +328,12 @@ public class TestFilteredScan {
     }
 
     {
-      IcebergBatchScan scan = new IcebergBatchScan(table, options);
+      SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
 
-      pushFilters(scan, And.apply(
+      pushFilters(builder, And.apply(
           GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"),
           LessThan.apply("ts", "2017-12-22T08:00:00+00:00")));
+      Batch scan = builder.build().toBatch();
 
       InputPartition[] tasks = scan.planInputPartitions();
       Assert.assertEquals("Should create one task for 2017-12-22", 1, tasks.length);
@@ -341,29 +350,31 @@ public class TestFilteredScan {
     Table table = buildPartitionedTable("partitioned_by_hour", PARTITION_BY_HOUR, "ts_hour", "ts");
 
     CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location()));
-    IcebergBatchScan unfiltered = new IcebergBatchScan(table, options);
+    Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options).build().toBatch();
 
     Assert.assertEquals("Unfiltered table should created 9 read tasks",
         9, unfiltered.planInputPartitions().length);
 
     {
-      IcebergBatchScan scan = new IcebergBatchScan(table, options);
+      SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
 
-      pushFilters(scan, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+      pushFilters(builder, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+      Batch scan = builder.build().toBatch();
 
       InputPartition[] tasks = scan.planInputPartitions();
       Assert.assertEquals("Should create 4 tasks for 2017-12-21: 15, 17, 21, 22", 4, tasks.length);
 
       assertEqualsSafe(SCHEMA.asStruct(), expected(8, 9, 7, 6, 5),
-          read(table.location().toString(), "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
+          read(table.location(), "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
     }
 
     {
-      IcebergBatchScan scan = new IcebergBatchScan(table, options);
+      SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
 
-      pushFilters(scan, And.apply(
+      pushFilters(builder, And.apply(
           GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"),
           LessThan.apply("ts", "2017-12-22T08:00:00+00:00")));
+      Batch scan = builder.build().toBatch();
 
       InputPartition[] tasks = scan.planInputPartitions();
       Assert.assertEquals("Should create 2 tasks for 2017-12-22: 6, 7", 2, tasks.length);
@@ -412,9 +423,10 @@ public class TestFilteredScan {
     Table table = buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, "data_ident", "data");
     CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location()));
 
-    IcebergBatchScan scan = new IcebergBatchScan(table, options);
+    SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
 
-    pushFilters(scan, new StringStartsWith("data", "junc"));
+    pushFilters(builder, new StringStartsWith("data", "junc"));
+    Batch scan = builder.build().toBatch();
 
     Assert.assertEquals(1, scan.planInputPartitions().length);
   }
@@ -427,8 +439,10 @@ public class TestFilteredScan {
         "path", table.location())
     );
 
-    IcebergBatchScan scan = new IcebergBatchScan(table, options);
-    pushFilters(scan, new StringStartsWith("data", "junc"));
+    SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
+
+    pushFilters(builder, new StringStartsWith("data", "junc"));
+    Batch scan = builder.build().toBatch();
 
     Assert.assertEquals(1, scan.planInputPartitions().length);
   }
@@ -487,7 +501,7 @@ public class TestFilteredScan {
     return expected;
   }
 
-  private void pushFilters(Scan scan, Filter... filters) {
+  private void pushFilters(ScanBuilder scan, Filter... filters) {
     Assert.assertTrue(scan instanceof SupportsPushDownFilters);
     SupportsPushDownFilters filterable = (SupportsPushDownFilters) scan;
     filterable.pushFilters(filters);
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
index fdf5daa..dd820fa 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
@@ -20,16 +20,17 @@
 package org.apache.iceberg.spark.source;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Table;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
-public class TestIcebergSource extends IcebergTableProvider {
+public class TestIcebergSource extends IcebergSource {
   @Override
   public String shortName() {
     return "iceberg-test";
   }
 
   @Override
-  public org.apache.iceberg.Table findTable(CaseInsensitiveStringMap options, Configuration conf) {
+  protected Table findTable(CaseInsensitiveStringMap options, Configuration conf) {
     return TestTables.load(options.get("iceberg.table.name"));
   }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
index da936ed..31ce025 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
@@ -39,8 +39,8 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -164,8 +164,7 @@ public class TestParquetWrite {
     Assert.assertEquals("Result rows should match", expected, actual);
   }
 
-  // ignore due to spark default use static
-  @Ignore
+  @Test
   public void testOverwrite() throws IOException {
     File parent = temp.newFolder("parquet");
     File location = new File(parent, "test");
@@ -197,7 +196,7 @@ public class TestParquetWrite {
 
     // overwrite with 2*id to replace record 2, append 4 and 6
     df.withColumn("id", df.col("id").multiply(2)).select("id", "data").write()
-        .option("partitionOverwriteMode", "dynamic")
+        .option("overwrite-mode", "dynamic")
         .format("iceberg")
         .mode("overwrite")
         .save(location.toString());
@@ -342,9 +341,12 @@ public class TestParquetWrite {
     Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
   }
 
-  // This fails due to SPARK-28730
-  @Ignore
+  @Test
   public void testWriteProjection() throws IOException {
+    Assume.assumeTrue(
+        "Not supported in Spark 3.0; analysis requires all columns are present",
+        spark.version().startsWith("2"));
+
     File parent = temp.newFolder("parquet");
     File location = new File(parent, "test");
 
@@ -376,9 +378,12 @@ public class TestParquetWrite {
     Assert.assertEquals("Result rows should match", expected, actual);
   }
 
-  // This fails due to SPARK-28730
-  @Ignore
+  @Test
   public void testWriteProjectionWithMiddle() throws IOException {
+    Assume.assumeTrue(
+        "Not supported in Spark 3.0; analysis requires all columns are present",
+        spark.version().startsWith("2"));
+
     File parent = temp.newFolder("parquet");
     File location = new File(parent, "test");
 
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
index 9550d00..b32ce7f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
@@ -41,7 +41,6 @@ import org.apache.spark.sql.streaming.StreamingQueryException;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -200,8 +199,7 @@ public class TestStructuredStreaming {
     }
   }
 
-  // This fails due to SPARK-28730
-  @Ignore
+  @Test
   public void testStreamingWriteCompleteModeWithProjection() throws IOException {
     File parent = temp.newFolder("parquet");
     File location = new File(parent, "test-table");