You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/10/18 23:16:40 UTC

[iceberg] branch master updated: Spark 3.2: Add SparkChangelogTable (#6013)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c8ce0c9551 Spark 3.2: Add SparkChangelogTable (#6013)
c8ce0c9551 is described below

commit c8ce0c955141a93162d9bfcbbf38d9c975d6ed1b
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Oct 18 16:16:34 2022 -0700

    Spark 3.2: Add SparkChangelogTable (#6013)
---
 .../spark/extensions/TestChangelogBatchReads.java  | 248 +++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkCatalog.java     |  73 +++---
 .../iceberg/spark/source/ChangelogRowReader.java   |   3 +-
 .../apache/iceberg/spark/source/SparkBatch.java    |  11 +-
 .../{SparkBatch.java => SparkChangelogBatch.java}  | 115 +++++-----
 .../iceberg/spark/source/SparkChangelogScan.java   | 163 ++++++++++++++
 .../iceberg/spark/source/SparkChangelogTable.java  | 102 +++++++++
 .../iceberg/spark/source/SparkInputPartition.java  |  84 +++++++
 .../spark/source/SparkMicroBatchStream.java        |  12 +-
 .../org/apache/iceberg/spark/source/SparkScan.java | 122 ++++------
 .../iceberg/spark/source/SparkScanBuilder.java     |  37 ++-
 .../actions/TestRemoveOrphanFilesAction3.java      |   6 +-
 .../iceberg/spark/source/TestPathIdentifier.java   |   5 +-
 13 files changed, 798 insertions(+), 183 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java
new file mode 100644
index 0000000000..530abe67c9
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+  @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        1,
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties()
+      },
+      {
+        2,
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties()
+      }
+    };
+  }
+
+  private final int formatVersion;
+
+  public TestChangelogBatchReads(
+      int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDataFilters() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    sql("DELETE FROM %s WHERE id = 3", tableName);
+
+    table.refresh();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(
+            row(3, "c", "INSERT", 2, snap3.snapshotId()),
+            row(3, "c", "DELETE", 3, snap4.snapshotId())),
+        sql("SELECT * FROM %s.changes WHERE id = 3 ORDER BY _change_ordinal, id", tableName));
+  }
+
+  @Test
+  public void testOverwrites() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", "DELETE", 0, snap3.snapshotId()),
+            row(-2, "b", "INSERT", 0, snap3.snapshotId())),
+        changelogRecords(snap2, snap3));
+  }
+
+  @Test
+  public void testMetadataDeletes() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql("DELETE FROM %s WHERE data = 'a'", tableName);
+
+    table.refresh();
+
+    Snapshot snap3 = table.currentSnapshot();
+    Assert.assertEquals("Operation must match", DataOperations.DELETE, snap3.operation());
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(row(1, "a", "DELETE", 0, snap3.snapshotId())),
+        changelogRecords(snap2, snap3));
+  }
+
+  @Test
+  public void testExistingEntriesInNewDataManifestsAreIgnored() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d', "
+            + " '%s' = '1', "
+            + " '%s' = 'true' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion, MANIFEST_MIN_MERGE_COUNT, MANIFEST_MERGE_ENABLED);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap2 = table.currentSnapshot();
+    Assert.assertEquals("Manifest number must match", 1, snap2.dataManifests(table.io()).size());
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(row(2, "b", "INSERT", 0, snap2.snapshotId())),
+        changelogRecords(snap1, snap2));
+  }
+
+  @Test
+  public void testManifestRewritesAreIgnored() {
+    sql(
+        "CREATE TABLE %s (id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES ( "
+            + " '%s' = '%d' "
+            + ")",
+        tableName, FORMAT_VERSION, formatVersion);
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    sql("CALL %s.system.rewrite_manifests('%s')", catalogName, tableIdent);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals("Num snapshots must match", 3, Iterables.size(table.snapshots()));
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1, "INSERT"), row(2, "INSERT")),
+        sql("SELECT id, _change_type FROM %s.changes ORDER BY id", tableName));
+  }
+
+  private List<Object[]> changelogRecords(Snapshot startSnapshot, Snapshot endSnapshot) {
+    DataFrameReader reader = spark.read();
+
+    if (startSnapshot != null) {
+      reader = reader.option(SparkReadOptions.START_SNAPSHOT_ID, startSnapshot.snapshotId());
+    }
+
+    if (endSnapshot != null) {
+      reader = reader.option(SparkReadOptions.END_SNAPSHOT_ID, endSnapshot.snapshotId());
+    }
+
+    return rowsToJava(collect(reader));
+  }
+
+  private List<Row> collect(DataFrameReader reader) {
+    return reader
+        .table(tableName + "." + SparkChangelogTable.TABLE_NAME)
+        .orderBy("_change_ordinal", "_commit_snapshot_id", "_change_type", "id")
+        .collectAsList();
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index 835a395f6e..b3a514dd21 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -35,7 +35,6 @@ import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
@@ -54,6 +53,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.actions.SparkActions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.iceberg.spark.source.StagedSparkTable;
 import org.apache.iceberg.util.Pair;
@@ -67,6 +67,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange;
@@ -137,23 +138,22 @@ public class SparkCatalog extends BaseCatalog {
   }
 
   @Override
-  public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
+  public Table loadTable(Identifier ident) throws NoSuchTableException {
     try {
-      Pair<Table, Long> icebergTable = load(ident);
-      return new SparkTable(icebergTable.first(), icebergTable.second(), !cacheEnabled);
+      return load(ident);
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
       throw new NoSuchTableException(ident);
     }
   }
 
   @Override
-  public SparkTable createTable(
+  public Table createTable(
       Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties)
       throws TableAlreadyExistsException {
     Schema icebergSchema = SparkSchemaUtil.convert(schema, useTimestampsWithoutZone);
     try {
       Catalog.TableBuilder builder = newBuilder(ident, icebergSchema);
-      Table icebergTable =
+      org.apache.iceberg.Table icebergTable =
           builder
               .withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms))
               .withLocation(properties.get("location"))
@@ -218,8 +218,7 @@ public class SparkCatalog extends BaseCatalog {
   }
 
   @Override
-  public SparkTable alterTable(Identifier ident, TableChange... changes)
-      throws NoSuchTableException {
+  public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
     SetProperty setLocation = null;
     SetProperty setSnapshotId = null;
     SetProperty pickSnapshotId = null;
@@ -252,7 +251,7 @@ public class SparkCatalog extends BaseCatalog {
     }
 
     try {
-      Table table = load(ident).first();
+      org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
       commitChanges(
           table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges);
       return new SparkTable(table, true /* refreshEagerly */);
@@ -269,7 +268,7 @@ public class SparkCatalog extends BaseCatalog {
   @Override
   public boolean purgeTable(Identifier ident) {
     try {
-      Table table = load(ident).first();
+      org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
       ValidationException.check(
           PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
           "Cannot purge table: GC is disabled (deleting files may corrupt other tables)");
@@ -493,7 +492,7 @@ public class SparkCatalog extends BaseCatalog {
   }
 
   private static void commitChanges(
-      Table table,
+      org.apache.iceberg.Table table,
       SetProperty setLocation,
       SetProperty setSnapshotId,
       SetProperty pickSnapshotId,
@@ -546,23 +545,24 @@ public class SparkCatalog extends BaseCatalog {
     }
   }
 
-  private Pair<Table, Long> load(Identifier ident) {
+  private Table load(Identifier ident) {
     if (isPathIdentifier(ident)) {
       return loadFromPathIdentifier((PathIdentifier) ident);
     }
 
     try {
-      return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);
+      org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
+      return new SparkTable(table, !cacheEnabled);
 
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
       if (ident.namespace().length == 0) {
         throw e;
       }
 
-      // if the original load didn't work, the identifier may be extended and include a snapshot
-      // selector
+      // if the original load didn't work, try using the namespace as an identifier because
+      // the original identifier may include a snapshot selector or may point to the changelog
       TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
-      Table table;
+      org.apache.iceberg.Table table;
       try {
         table = icebergCatalog.loadTable(namespaceAsIdent);
       } catch (Exception ignored) {
@@ -572,19 +572,27 @@ public class SparkCatalog extends BaseCatalog {
       }
 
       // loading the namespace as a table worked, check the name to see if it is a valid selector
+      // or if the name points to the changelog
+
+      if (ident.name().equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {
+        return new SparkChangelogTable(table, !cacheEnabled);
+      }
+
       Matcher at = AT_TIMESTAMP.matcher(ident.name());
       if (at.matches()) {
         long asOfTimestamp = Long.parseLong(at.group(1));
-        return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
+        long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
+        return new SparkTable(table, snapshotId, !cacheEnabled);
       }
 
       Matcher id = SNAPSHOT_ID.matcher(ident.name());
       if (id.matches()) {
         long snapshotId = Long.parseLong(id.group(1));
-        return Pair.of(table, snapshotId);
+        return new SparkTable(table, snapshotId, !cacheEnabled);
       }
 
-      // the name wasn't a valid snapshot selector. throw the original exception
+      // the name wasn't a valid snapshot selector and did not point to the changelog
+      // throw the original exception
       throw e;
     }
   }
@@ -600,13 +608,21 @@ public class SparkCatalog extends BaseCatalog {
     }
   }
 
-  private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
+  @SuppressWarnings("CyclomaticComplexity")
+  private Table loadFromPathIdentifier(PathIdentifier ident) {
     Pair<String, List<String>> parsed = parseLocationString(ident.location());
 
     String metadataTableName = null;
     Long asOfTimestamp = null;
     Long snapshotId = null;
+    boolean isChangelog = false;
+
     for (String meta : parsed.second()) {
+      if (meta.equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {
+        isChangelog = true;
+        continue;
+      }
+
       if (MetadataTableType.from(meta) != null) {
         metadataTableName = meta;
         continue;
@@ -629,15 +645,22 @@ public class SparkCatalog extends BaseCatalog {
         "Cannot specify both snapshot-id and as-of-timestamp: %s",
         ident.location());
 
-    Table table =
+    Preconditions.checkArgument(
+        !isChangelog || (snapshotId == null && asOfTimestamp == null),
+        "Cannot specify snapshot-id and as-of-timestamp for changelogs");
+
+    org.apache.iceberg.Table table =
         tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : ""));
 
-    if (snapshotId != null) {
-      return Pair.of(table, snapshotId);
+    if (isChangelog) {
+      return new SparkChangelogTable(table, !cacheEnabled);
+
     } else if (asOfTimestamp != null) {
-      return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
+      long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
+      return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled);
+
     } else {
-      return Pair.of(table, null);
+      return new SparkTable(table, snapshotId, !cacheEnabled);
     }
   }
 
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 20f0893bcc..a1111eda12 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.stream.Stream;
 import org.apache.iceberg.AddedRowsScanTask;
 import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ChangelogUtil;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.ContentScanTask;
 import org.apache.iceberg.DataFile;
@@ -48,7 +49,7 @@ class ChangelogRowReader extends BaseRowReader<ChangelogScanTask> {
       ScanTaskGroup<ChangelogScanTask> taskGroup,
       Schema expectedSchema,
       boolean caseSensitive) {
-    super(table, taskGroup, expectedSchema, caseSensitive);
+    super(table, taskGroup, ChangelogUtil.dropChangelogMetadata(expectedSchema), caseSensitive);
   }
 
   @Override
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index 6d85047943..19a1dce3c9 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -25,7 +25,6 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.spark.source.SparkScan.ReadTask;
 import org.apache.iceberg.spark.source.SparkScan.ReaderFactory;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.iceberg.util.Tasks;
@@ -62,22 +61,22 @@ abstract class SparkBatch implements Batch {
         sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
     String expectedSchemaString = SchemaParser.toJson(expectedSchema);
 
-    InputPartition[] readTasks = new InputPartition[tasks().size()];
+    InputPartition[] partitions = new InputPartition[tasks().size()];
 
-    Tasks.range(readTasks.length)
+    Tasks.range(partitions.length)
         .stopOnFailure()
         .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
         .run(
             index ->
-                readTasks[index] =
-                    new ReadTask(
+                partitions[index] =
+                    new SparkInputPartition(
                         tasks().get(index),
                         tableBroadcast,
                         expectedSchemaString,
                         caseSensitive,
                         localityEnabled));
 
-    return readTasks;
+    return partitions;
   }
 
   protected abstract List<CombinedScanTask> tasks();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java
similarity index 50%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java
index 6d85047943..38e25e9989 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java
@@ -19,115 +19,120 @@
 package org.apache.iceberg.spark.source;
 
 import java.util.List;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.FileFormat;
+import java.util.Objects;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.spark.source.SparkScan.ReadTask;
-import org.apache.iceberg.spark.source.SparkScan.ReaderFactory;
-import org.apache.iceberg.util.TableScanUtil;
 import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.read.Batch;
 import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
 import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 
-abstract class SparkBatch implements Batch {
+class SparkChangelogBatch implements Batch {
 
   private final JavaSparkContext sparkContext;
   private final Table table;
-  private final SparkReadConf readConf;
+  private final List<ScanTaskGroup<ChangelogScanTask>> taskGroups;
   private final Schema expectedSchema;
   private final boolean caseSensitive;
   private final boolean localityEnabled;
-
-  SparkBatch(SparkSession spark, Table table, SparkReadConf readConf, Schema expectedSchema) {
+  private final int scanHashCode;
+
+  SparkChangelogBatch(
+      SparkSession spark,
+      Table table,
+      SparkReadConf readConf,
+      List<ScanTaskGroup<ChangelogScanTask>> taskGroups,
+      Schema expectedSchema,
+      int scanHashCode) {
     this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
-    this.readConf = readConf;
+    this.taskGroups = taskGroups;
     this.expectedSchema = expectedSchema;
     this.caseSensitive = readConf.caseSensitive();
     this.localityEnabled = readConf.localityEnabled();
+    this.scanHashCode = scanHashCode;
   }
 
   @Override
   public InputPartition[] planInputPartitions() {
-    // broadcast the table metadata as input partitions will be sent to executors
-    Broadcast<Table> tableBroadcast =
-        sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+    Table serializableTable = SerializableTableWithSize.copyOf(table);
+    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
     String expectedSchemaString = SchemaParser.toJson(expectedSchema);
 
-    InputPartition[] readTasks = new InputPartition[tasks().size()];
+    InputPartition[] partitions = new InputPartition[taskGroups.size()];
 
-    Tasks.range(readTasks.length)
+    Tasks.range(partitions.length)
         .stopOnFailure()
         .executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
         .run(
             index ->
-                readTasks[index] =
-                    new ReadTask(
-                        tasks().get(index),
+                partitions[index] =
+                    new SparkInputPartition(
+                        taskGroups.get(index),
                         tableBroadcast,
                         expectedSchemaString,
                         caseSensitive,
                         localityEnabled));
 
-    return readTasks;
-  }
-
-  protected abstract List<CombinedScanTask> tasks();
-
-  protected JavaSparkContext sparkContext() {
-    return sparkContext;
+    return partitions;
   }
 
   @Override
   public PartitionReaderFactory createReaderFactory() {
-    return new ReaderFactory(batchSize());
+    return new ReaderFactory();
   }
 
-  private int batchSize() {
-    if (parquetOnly() && parquetBatchReadsEnabled()) {
-      return readConf.parquetBatchSize();
-    } else if (orcOnly() && orcBatchReadsEnabled()) {
-      return readConf.orcBatchSize();
-    } else {
-      return 0;
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
     }
-  }
 
-  private boolean parquetOnly() {
-    return tasks().stream()
-        .allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.PARQUET));
-  }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
 
-  private boolean parquetBatchReadsEnabled() {
-    return readConf.parquetVectorizationEnabled()
-        && // vectorization enabled
-        expectedSchema.columns().size() > 0
-        && // at least one column is projected
-        expectedSchema.columns().stream()
-            .allMatch(c -> c.type().isPrimitiveType()); // only primitives
+    SparkChangelogBatch that = (SparkChangelogBatch) o;
+    return table.name().equals(that.table.name()) && scanHashCode == that.scanHashCode;
   }
 
-  private boolean orcOnly() {
-    return tasks().stream()
-        .allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.ORC));
+  @Override
+  public int hashCode() {
+    return Objects.hash(table.name(), scanHashCode);
   }
 
-  private boolean orcBatchReadsEnabled() {
-    return readConf.orcVectorizationEnabled()
-        && // vectorization enabled
-        tasks().stream().noneMatch(TableScanUtil::hasDeletes); // no delete files
+  private static class ReaderFactory implements PartitionReaderFactory {
+    @Override
+    public PartitionReader<InternalRow> createReader(InputPartition partition) {
+      Preconditions.checkArgument(
+          partition instanceof SparkInputPartition,
+          "Unknown input partition type: %s",
+          partition.getClass().getName());
+
+      return new RowReader((SparkInputPartition) partition);
+    }
   }
 
-  private boolean onlyFileFormat(CombinedScanTask task, FileFormat fileFormat) {
-    return task.files().stream()
-        .allMatch(fileScanTask -> fileScanTask.file().format().equals(fileFormat));
+  private static class RowReader extends ChangelogRowReader
+      implements PartitionReader<InternalRow> {
+
+    RowReader(SparkInputPartition partition) {
+      super(
+          partition.table(),
+          partition.taskGroup(),
+          partition.expectedSchema(),
+          partition.isCaseSensitive());
+    }
   }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
new file mode 100644
index 0000000000..3f7927c6d6
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.types.StructType;
+
+class SparkChangelogScan implements Scan, SupportsReportStatistics {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final IncrementalChangelogScan scan;
+  private final SparkReadConf readConf;
+  private final Schema expectedSchema;
+  private final List<Expression> filters;
+  private final Long startSnapshotId;
+  private final Long endSnapshotId;
+  private final boolean readTimestampWithoutZone;
+
+  // lazy variables
+  private List<ScanTaskGroup<ChangelogScanTask>> taskGroups = null;
+  private StructType expectedSparkType = null;
+
+  SparkChangelogScan(
+      SparkSession spark,
+      Table table,
+      IncrementalChangelogScan scan,
+      SparkReadConf readConf,
+      Schema expectedSchema,
+      List<Expression> filters) {
+
+    SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);
+
+    this.spark = spark;
+    this.table = table;
+    this.scan = scan;
+    this.readConf = readConf;
+    this.expectedSchema = expectedSchema;
+    this.filters = filters != null ? filters : Collections.emptyList();
+    this.startSnapshotId = readConf.startSnapshotId();
+    this.endSnapshotId = readConf.endSnapshotId();
+    this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
+  }
+
+  @Override
+  public Statistics estimateStatistics() {
+    long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
+    long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
+    return new Stats(sizeInBytes, rowsCount);
+  }
+
+  @Override
+  public StructType readSchema() {
+    if (expectedSparkType == null) {
+      Preconditions.checkArgument(
+          readTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(expectedSchema),
+          SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
+
+      this.expectedSparkType = SparkSchemaUtil.convert(expectedSchema);
+    }
+
+    return expectedSparkType;
+  }
+
+  @Override
+  public Batch toBatch() {
+    return new SparkChangelogBatch(
+        spark, table, readConf, taskGroups(), expectedSchema, hashCode());
+  }
+
+  private List<ScanTaskGroup<ChangelogScanTask>> taskGroups() {
+    if (taskGroups == null) {
+      try (CloseableIterable<ScanTaskGroup<ChangelogScanTask>> groups = scan.planTasks()) {
+        this.taskGroups = Lists.newArrayList(groups);
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to close changelog scan: " + scan, e);
+      }
+    }
+
+    return taskGroups;
+  }
+
+  @Override
+  public String description() {
+    return String.format(
+        "%s [fromSnapshotId=%d, toSnapshotId=%d, filters=%s]",
+        table, startSnapshotId, endSnapshotId, filtersAsString());
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergChangelogScan(table=%s, type=%s, fromSnapshotId=%d, toSnapshotId=%d, filters=%s)",
+        table, expectedSchema.asStruct(), startSnapshotId, endSnapshotId, filtersAsString());
+  }
+
+  private String filtersAsString() {
+    return filters.stream().map(Spark3Util::describe).collect(Collectors.joining(", "));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SparkChangelogScan that = (SparkChangelogScan) o;
+    return table.name().equals(that.table.name())
+        && readSchema().equals(that.readSchema()) // compare Spark schemas to ignore field IDs
+        && filters.toString().equals(that.filters.toString())
+        && Objects.equals(startSnapshotId, that.startSnapshotId)
+        && Objects.equals(endSnapshotId, that.endSnapshotId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        table.name(), readSchema(), filters.toString(), startSnapshotId, endSnapshotId);
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java
new file mode 100644
index 0000000000..645a583e5d
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Set;
+import org.apache.iceberg.ChangelogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkChangelogTable implements Table, SupportsRead {
+
+  public static final String TABLE_NAME = "changes";
+
+  private static final Set<TableCapability> CAPABILITIES =
+      ImmutableSet.of(TableCapability.BATCH_READ);
+
+  private final org.apache.iceberg.Table icebergTable;
+  private final boolean refreshEagerly;
+
+  private SparkSession lazySpark = null;
+  private StructType lazyTableSparkType = null;
+  private Schema lazyChangelogSchema = null;
+
+  public SparkChangelogTable(org.apache.iceberg.Table icebergTable, boolean refreshEagerly) {
+    this.icebergTable = icebergTable;
+    this.refreshEagerly = refreshEagerly;
+  }
+
+  @Override
+  public String name() {
+    return icebergTable.name() + "." + TABLE_NAME;
+  }
+
+  @Override
+  public StructType schema() {
+    if (lazyTableSparkType == null) {
+      this.lazyTableSparkType = SparkSchemaUtil.convert(changelogSchema());
+    }
+
+    return lazyTableSparkType;
+  }
+
+  @Override
+  public Set<TableCapability> capabilities() {
+    return CAPABILITIES;
+  }
+
+  @Override
+  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+    if (refreshEagerly) {
+      icebergTable.refresh();
+    }
+
+    return new SparkScanBuilder(spark(), icebergTable, changelogSchema(), options) {
+      @Override
+      public Scan build() {
+        return buildChangelogScan();
+      }
+    };
+  }
+
+  private Schema changelogSchema() {
+    if (lazyChangelogSchema == null) {
+      this.lazyChangelogSchema = ChangelogUtil.changelogSchema(icebergTable.schema());
+    }
+
+    return lazyChangelogSchema;
+  }
+
+  private SparkSession spark() {
+    if (lazySpark == null) {
+      this.lazySpark = SparkSession.active();
+    }
+
+    return lazySpark;
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
new file mode 100644
index 0000000000..7786ee0eaf
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.Serializable;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+
+class SparkInputPartition implements InputPartition, Serializable {
+  private final ScanTaskGroup<?> taskGroup;
+  private final Broadcast<Table> tableBroadcast;
+  private final String expectedSchemaString;
+  private final boolean caseSensitive;
+
+  private transient Schema expectedSchema = null;
+  private transient String[] preferredLocations = null;
+
+  SparkInputPartition(
+      ScanTaskGroup<?> taskGroup,
+      Broadcast<Table> tableBroadcast,
+      String expectedSchemaString,
+      boolean caseSensitive,
+      boolean localityPreferred) {
+    this.taskGroup = taskGroup;
+    this.tableBroadcast = tableBroadcast;
+    this.expectedSchemaString = expectedSchemaString;
+    this.caseSensitive = caseSensitive;
+    if (localityPreferred) {
+      Table table = tableBroadcast.value();
+      this.preferredLocations = Util.blockLocations(table.io(), taskGroup);
+    } else {
+      this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
+    }
+  }
+
+  @Override
+  public String[] preferredLocations() {
+    return preferredLocations;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T extends ScanTask> ScanTaskGroup<T> taskGroup() {
+    return (ScanTaskGroup<T>) taskGroup;
+  }
+
+  public Table table() {
+    return tableBroadcast.value();
+  }
+
+  public boolean isCaseSensitive() {
+    return caseSensitive;
+  }
+
+  public Schema expectedSchema() {
+    if (expectedSchema == null) {
+      this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
+    }
+
+    return expectedSchema;
+  }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index d3c299aa8b..972988b6b2 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -47,7 +47,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.spark.SparkReadOptions;
-import org.apache.iceberg.spark.source.SparkScan.ReadTask;
 import org.apache.iceberg.spark.source.SparkScan.ReaderFactory;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
@@ -152,22 +151,23 @@ public class SparkMicroBatchStream implements MicroBatchStream {
     List<CombinedScanTask> combinedScanTasks =
         Lists.newArrayList(
             TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost));
-    InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
 
-    Tasks.range(readTasks.length)
+    InputPartition[] partitions = new InputPartition[combinedScanTasks.size()];
+
+    Tasks.range(partitions.length)
         .stopOnFailure()
         .executeWith(localityPreferred ? ThreadPools.getWorkerPool() : null)
         .run(
             index ->
-                readTasks[index] =
-                    new ReadTask(
+                partitions[index] =
+                    new SparkInputPartition(
                         combinedScanTasks.get(index),
                         tableBroadcast,
                         expectedSchema,
                         caseSensitive,
                         localityPreferred));
 
-    return readTasks;
+    return partitions;
   }
 
   @Override
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 8dc2b47d27..162333e602 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iceberg.spark.source;
 
-import java.io.Serializable;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -27,13 +25,10 @@ import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.hadoop.HadoopInputFile;
-import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkReadConf;
@@ -44,7 +39,6 @@ import org.apache.iceberg.spark.source.metrics.NumSplits;
 import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
 import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
 import org.apache.iceberg.util.PropertyUtil;
-import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.metric.CustomMetric;
@@ -178,20 +172,22 @@ abstract class SparkScan extends SparkBatch implements Scan, SupportsReportStati
 
     @Override
     public PartitionReader<InternalRow> createReader(InputPartition partition) {
-      if (partition instanceof ReadTask) {
-        return new RowReader((ReadTask) partition);
-      } else {
-        throw new UnsupportedOperationException("Incorrect input partition type: " + partition);
-      }
+      Preconditions.checkArgument(
+          partition instanceof SparkInputPartition,
+          "Unknown input partition type: %s",
+          partition.getClass().getName());
+
+      return new RowReader((SparkInputPartition) partition);
     }
 
     @Override
     public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition partition) {
-      if (partition instanceof ReadTask) {
-        return new BatchReader((ReadTask) partition, batchSize);
-      } else {
-        throw new UnsupportedOperationException("Incorrect input partition type: " + partition);
-      }
+      Preconditions.checkArgument(
+          partition instanceof SparkInputPartition,
+          "Unknown input partition type: %s",
+          partition.getClass().getName());
+
+      return new BatchReader((SparkInputPartition) partition, batchSize);
     }
 
     @Override
@@ -201,13 +197,19 @@ abstract class SparkScan extends SparkBatch implements Scan, SupportsReportStati
   }
 
   private static class RowReader extends RowDataReader implements PartitionReader<InternalRow> {
-    private long numSplits;
+    private static final Logger LOG = LoggerFactory.getLogger(RowReader.class);
+
+    private final long numSplits;
+
+    RowReader(SparkInputPartition partition) {
+      super(
+          partition.taskGroup(),
+          partition.table(),
+          partition.expectedSchema(),
+          partition.isCaseSensitive());
 
-    RowReader(ReadTask task) {
-      super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive());
-      numSplits = task.task.files().size();
-      LOG.debug(
-          "Reading {} file split(s) for table {} using RowReader", numSplits, task.table().name());
+      numSplits = partition.taskGroup().tasks().size();
+      LOG.debug("Reading {} file split(s) for table {}", numSplits, partition.table().name());
     }
 
     @Override
@@ -220,15 +222,21 @@ abstract class SparkScan extends SparkBatch implements Scan, SupportsReportStati
 
   private static class BatchReader extends BatchDataReader
       implements PartitionReader<ColumnarBatch> {
-    private long numSplits;
-
-    BatchReader(ReadTask task, int batchSize) {
-      super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize);
-      numSplits = task.task.files().size();
-      LOG.debug(
-          "Reading {} file split(s) for table {} using BatchReader",
-          numSplits,
-          task.table().name());
+
+    private static final Logger LOG = LoggerFactory.getLogger(BatchReader.class);
+
+    private final long numSplits;
+
+    BatchReader(SparkInputPartition partition, int batchSize) {
+      super(
+          partition.taskGroup(),
+          partition.table(),
+          partition.expectedSchema(),
+          partition.isCaseSensitive(),
+          batchSize);
+
+      numSplits = partition.taskGroup().tasks().size();
+      LOG.debug("Reading {} file split(s) for table {}", numSplits, partition.table().name());
     }
 
     @Override
@@ -238,56 +246,4 @@ abstract class SparkScan extends SparkBatch implements Scan, SupportsReportStati
       };
     }
   }
-
-  static class ReadTask implements InputPartition, Serializable {
-    private final CombinedScanTask task;
-    private final Broadcast<Table> tableBroadcast;
-    private final String expectedSchemaString;
-    private final boolean caseSensitive;
-
-    private transient Schema expectedSchema = null;
-    private transient String[] preferredLocations = null;
-
-    ReadTask(
-        CombinedScanTask task,
-        Broadcast<Table> tableBroadcast,
-        String expectedSchemaString,
-        boolean caseSensitive,
-        boolean localityPreferred) {
-      this.task = task;
-      this.tableBroadcast = tableBroadcast;
-      this.expectedSchemaString = expectedSchemaString;
-      this.caseSensitive = caseSensitive;
-      if (localityPreferred) {
-        Table table = tableBroadcast.value();
-        this.preferredLocations = Util.blockLocations(table.io(), task);
-      } else {
-        this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
-      }
-    }
-
-    @Override
-    public String[] preferredLocations() {
-      return preferredLocations;
-    }
-
-    public Collection<FileScanTask> files() {
-      return task.files();
-    }
-
-    public Table table() {
-      return tableBroadcast.value();
-    }
-
-    public boolean isCaseSensitive() {
-      return caseSensitive;
-    }
-
-    private Schema expectedSchema() {
-      if (expectedSchema == null) {
-        this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
-      }
-      return expectedSchema;
-    }
-  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 21c34ed6f6..b291a8e267 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.iceberg.IncrementalChangelogScan;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -238,6 +239,38 @@ public class SparkScanBuilder
     return new SparkBatchQueryScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
   }
 
+  public Scan buildChangelogScan() {
+    Preconditions.checkArgument(
+        readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
+        "Cannot set neither %s nor %s for changelogs",
+        SparkReadOptions.SNAPSHOT_ID,
+        SparkReadOptions.AS_OF_TIMESTAMP);
+
+    Long startSnapshotId = readConf.startSnapshotId();
+    Long endSnapshotId = readConf.endSnapshotId();
+
+    Schema expectedSchema = schemaWithMetadataColumns();
+
+    IncrementalChangelogScan scan =
+        table
+            .newIncrementalChangelogScan()
+            .caseSensitive(caseSensitive)
+            .filter(filterExpression())
+            .project(expectedSchema);
+
+    if (startSnapshotId != null) {
+      scan = scan.fromSnapshotExclusive(startSnapshotId);
+    }
+
+    if (endSnapshotId != null) {
+      scan = scan.toSnapshot(endSnapshotId);
+    }
+
+    scan = configureSplitPlanning(scan);
+
+    return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
+  }
+
   public Scan buildMergeOnReadScan() {
     Preconditions.checkArgument(
         readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
@@ -306,8 +339,8 @@ public class SparkScanBuilder
         spark, table, scan, snapshot, readConf, expectedSchema, filterExpressions);
   }
 
-  private TableScan configureSplitPlanning(TableScan scan) {
-    TableScan configuredScan = scan;
+  private <T extends org.apache.iceberg.Scan<T, ?, ?>> T configureSplitPlanning(T scan) {
+    T configuredScan = scan;
 
     Long splitSize = readConf.splitSizeOption();
     if (splitSize != null) {
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
index e3699eaede..0abfd79d5d 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
@@ -46,7 +46,7 @@ public class TestRemoveOrphanFilesAction3 extends TestRemoveOrphanFilesAction {
     Map<String, String> options = Maps.newHashMap();
     Transform[] transforms = {};
     cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
-    SparkTable table = cat.loadTable(id);
+    SparkTable table = (SparkTable) cat.loadTable(id);
 
     spark.sql("INSERT INTO mycat.default.table VALUES (1,1,1)");
 
@@ -76,7 +76,7 @@ public class TestRemoveOrphanFilesAction3 extends TestRemoveOrphanFilesAction {
     Map<String, String> options = Maps.newHashMap();
     Transform[] transforms = {};
     cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
-    SparkTable table = cat.loadTable(id);
+    SparkTable table = (SparkTable) cat.loadTable(id);
 
     spark.sql("INSERT INTO hadoop.default.table VALUES (1,1,1)");
 
@@ -106,7 +106,7 @@ public class TestRemoveOrphanFilesAction3 extends TestRemoveOrphanFilesAction {
     Map<String, String> options = Maps.newHashMap();
     Transform[] transforms = {};
     cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
-    SparkTable table = cat.loadTable(id);
+    SparkTable table = (SparkTable) cat.loadTable(id);
 
     spark.sql("INSERT INTO hive.default.table VALUES (1,1,1)");
 
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java
index f58451296c..5baf607123 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java
@@ -71,8 +71,9 @@ public class TestPathIdentifier extends SparkTestBase {
   @Test
   public void testPathIdentifier() throws TableAlreadyExistsException, NoSuchTableException {
     SparkTable table =
-        sparkCatalog.createTable(
-            identifier, SparkSchemaUtil.convert(SCHEMA), new Transform[0], ImmutableMap.of());
+        (SparkTable)
+            sparkCatalog.createTable(
+                identifier, SparkSchemaUtil.convert(SCHEMA), new Transform[0], ImmutableMap.of());
 
     Assert.assertEquals(table.table().location(), tableLocation.getAbsolutePath());
     Assertions.assertThat(table.table()).isInstanceOf(BaseTable.class);