You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/27 02:51:53 UTC

[iceberg] branch master updated: Spark, Flink: Pass FileIO into Snapshot methods that read metadata, backports (#4877)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f412ce5f9 Spark, Flink: Pass FileIO into Snapshot methods that read metadata, backports (#4877)
f412ce5f9 is described below

commit f412ce5f910c2cbcaf76529ff1194b302fa7acb6
Author: Kyle Bendickson <kj...@gmail.com>
AuthorDate: Thu May 26 19:51:49 2022 -0700

    Spark, Flink: Pass FileIO into Snapshot methods that read metadata, backports (#4877)
---
 .../apache/iceberg/flink/TestChangeLogTable.java   |  2 +-
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java |  2 +-
 .../apache/iceberg/flink/TestChangeLogTable.java   |  2 +-
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java |  2 +-
 .../extensions/TestRewriteManifestsProcedure.java  | 16 +++++-----
 .../actions/BaseRewriteManifestsSparkAction.java   |  2 +-
 .../spark/source/SparkMicroBatchStream.java        |  3 +-
 .../spark/actions/TestExpireSnapshotsAction.java   | 36 +++++++++++-----------
 .../spark/actions/TestRewriteDataFilesAction.java  | 11 ++++---
 .../spark/actions/TestRewriteManifestsAction.java  | 23 +++++++-------
 .../iceberg/spark/source/TestDataFrameWrites.java  |  2 +-
 .../spark/source/TestDataSourceOptions.java        |  8 ++---
 .../spark/source/TestIcebergSourceTablesBase.java  | 33 +++++++++++---------
 .../iceberg/spark/source/TestSparkDataFile.java    |  2 +-
 .../iceberg/spark/source/TestSparkDataWrite.java   |  6 ++--
 .../apache/iceberg/spark/sql/TestRefreshTable.java |  2 +-
 .../extensions/TestRewriteManifestsProcedure.java  | 16 +++++-----
 .../actions/BaseRewriteManifestsSparkAction.java   |  2 +-
 .../spark/source/SparkMicroBatchStream.java        |  3 +-
 .../spark/actions/TestExpireSnapshotsAction.java   | 36 +++++++++++-----------
 .../spark/actions/TestRewriteDataFilesAction.java  | 11 ++++---
 .../spark/actions/TestRewriteManifestsAction.java  | 23 +++++++-------
 .../iceberg/spark/source/TestDataFrameWrites.java  |  2 +-
 .../spark/source/TestDataSourceOptions.java        |  8 ++---
 .../spark/source/TestIcebergSourceTablesBase.java  | 33 +++++++++++---------
 .../iceberg/spark/source/TestSparkDataFile.java    |  2 +-
 .../iceberg/spark/source/TestSparkDataWrite.java   |  6 ++--
 .../apache/iceberg/spark/sql/TestRefreshTable.java |  2 +-
 28 files changed, 154 insertions(+), 142 deletions(-)

diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
index 68b706e2d..5c04c8551 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
@@ -296,7 +296,7 @@ public class TestChangeLogTable extends ChangeLogTableTestBase {
   private List<Snapshot> findValidSnapshots(Table table) {
     List<Snapshot> validSnapshots = Lists.newArrayList();
     for (Snapshot snapshot : table.snapshots()) {
-      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+      if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
         validSnapshots.add(snapshot);
       }
     }
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 23169d1a5..97506b90b 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -147,7 +147,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
   private List<Snapshot> findValidSnapshots(Table table) {
     List<Snapshot> validSnapshots = Lists.newArrayList();
     for (Snapshot snapshot : table.snapshots()) {
-      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+      if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
         validSnapshots.add(snapshot);
       }
     }
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
index 68b706e2d..5c04c8551 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java
@@ -296,7 +296,7 @@ public class TestChangeLogTable extends ChangeLogTableTestBase {
   private List<Snapshot> findValidSnapshots(Table table) {
     List<Snapshot> validSnapshots = Lists.newArrayList();
     for (Snapshot snapshot : table.snapshots()) {
-      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+      if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
         validSnapshots.add(snapshot);
       }
     }
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 23169d1a5..97506b90b 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -147,7 +147,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
   private List<Snapshot> findValidSnapshots(Table table) {
     List<Snapshot> validSnapshots = Lists.newArrayList();
     for (Snapshot snapshot : table.snapshots()) {
-      if (snapshot.allManifests().stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+      if (snapshot.allManifests(table.io()).stream().anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
         validSnapshots.add(snapshot);
       }
     }
diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index b04f17693..dcf0a2d91 100644
--- a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -60,7 +60,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     Table table = validationCatalog.loadTable(tableIdent);
 
-    Assert.assertEquals("Must have 1 manifest", 1, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 1 manifest", 1, table.currentSnapshot().allManifests(table.io()).size());
 
     sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest.target-size-bytes' '1')", tableName);
 
@@ -72,7 +72,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     table.refresh();
 
-    Assert.assertEquals("Must have 4 manifests", 4, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 4 manifests", 4, table.currentSnapshot().allManifests(table.io()).size());
   }
 
   @Test
@@ -88,7 +88,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     Table table = validationCatalog.loadTable(tableIdent);
 
-    Assert.assertEquals("Must have 4 manifest", 4, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 4 manifest", 4, table.currentSnapshot().allManifests(table.io()).size());
 
     List<Object[]> output = sql(
         "CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableIdent);
@@ -98,7 +98,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     table.refresh();
 
-    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size());
   }
 
   @Test
@@ -110,7 +110,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     Table table = validationCatalog.loadTable(tableIdent);
 
-    Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests(table.io()).size());
 
     List<Object[]> output = sql(
         "CALL %s.system.rewrite_manifests(use_caching => false, table => '%s')", catalogName, tableIdent);
@@ -120,7 +120,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     table.refresh();
 
-    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size());
   }
 
   @Test
@@ -132,7 +132,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     Table table = validationCatalog.loadTable(tableIdent);
 
-    Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests(table.io()).size());
 
     List<Object[]> output = sql(
         "CALL %s.system.rewrite_manifests(usE_cAcHiNg => false, tAbLe => '%s')", catalogName, tableIdent);
@@ -142,7 +142,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     table.refresh();
 
-    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size());
   }
 
   @Test
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index c446d42ca..b1769f428 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -261,7 +261,7 @@ public class BaseRewriteManifestsSparkAction
       return ImmutableList.of();
     }
 
-    return currentSnapshot.dataManifests().stream()
+    return currentSnapshot.dataManifests(fileIO).stream()
         .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
         .collect(Collectors.toList());
   }
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index dd68022c6..d72928b6b 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -111,7 +111,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
     }
 
     Snapshot latestSnapshot = table.currentSnapshot();
-    return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()), false);
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles(table.io())), false);
   }
 
   @Override
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index c4445e954..9b8c9d250 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -570,13 +570,13 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     expectedDeletes.add(snapshotA.manifestListLocation());
 
     // Files should be deleted of dangling staged snapshot
-    snapshotB.addedFiles().forEach(i -> {
+    snapshotB.addedFiles(table.io()).forEach(i -> {
       expectedDeletes.add(i.path().toString());
     });
 
     // ManifestList should be deleted too
     expectedDeletes.add(snapshotB.manifestListLocation());
-    snapshotB.dataManifests().forEach(file -> {
+    snapshotB.dataManifests(table.io()).forEach(file -> {
       // Only the manifest of B should be deleted.
       if (file.snapshotId() == snapshotB.snapshotId()) {
         expectedDeletes.add(file.path());
@@ -645,7 +645,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     // Make sure no dataFiles are deleted for the B, C, D snapshot
     Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> {
-      i.addedFiles().forEach(item -> {
+      i.addedFiles(table.io()).forEach(item -> {
         Assert.assertFalse(deletedFiles.contains(item.path().toString()));
       });
     });
@@ -700,7 +700,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     // Make sure no dataFiles are deleted for the staged snapshot
     Lists.newArrayList(snapshotB).forEach(i -> {
-      i.addedFiles().forEach(item -> {
+      i.addedFiles(table.io()).forEach(item -> {
         Assert.assertFalse(deletedFiles.contains(item.path().toString()));
       });
     });
@@ -714,7 +714,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     // Make sure no dataFiles are deleted for the staged and cherry-pick
     Lists.newArrayList(snapshotB, snapshotD).forEach(i -> {
-      i.addedFiles().forEach(item -> {
+      i.addedFiles(table.io()).forEach(item -> {
         Assert.assertFalse(deletedFiles.contains(item.path().toString()));
       });
     });
@@ -762,7 +762,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot firstSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should create one manifest",
-        1, firstSnapshot.allManifests().size());
+        1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
@@ -772,7 +772,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot secondSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should create replace manifest with a rewritten manifest",
-        1, secondSnapshot.allManifests().size());
+        1, secondSnapshot.allManifests(table.io()).size());
 
     table.newAppend()
         .appendFile(FILE_B)
@@ -798,9 +798,9 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     Assert.assertEquals("Should remove expired manifest lists and deleted data file",
         Sets.newHashSet(
             firstSnapshot.manifestListLocation(), // snapshot expired
-            firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete
+            firstSnapshot.allManifests(table.io()).get(0).path(), // manifest was rewritten for delete
             secondSnapshot.manifestListLocation(), // snapshot expired
-            secondSnapshot.allManifests().get(0).path(), // manifest contained only deletes, was dropped
+            secondSnapshot.allManifests(table.io()).get(0).path(), // manifest contained only deletes, was dropped
             FILE_A.path()), // deleted
         deletedFiles);
 
@@ -821,7 +821,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot firstSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should create one manifest",
-        1, firstSnapshot.allManifests().size());
+        1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
@@ -831,7 +831,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot secondSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should replace manifest with a rewritten manifest",
-        1, secondSnapshot.allManifests().size());
+        1, secondSnapshot.allManifests(table.io()).size());
 
     table.newFastAppend() // do not merge to keep the last snapshot's manifest valid
         .appendFile(FILE_C)
@@ -857,7 +857,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     Assert.assertEquals("Should remove expired manifest lists and deleted data file",
         Sets.newHashSet(
             firstSnapshot.manifestListLocation(), // snapshot expired
-            firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete
+            firstSnapshot.allManifests(table.io()).get(0).path(), // manifest was rewritten for delete
             secondSnapshot.manifestListLocation(), // snapshot expired
             FILE_A.path()), // deleted
         deletedFiles);
@@ -879,7 +879,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot firstSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should create one manifest",
-        1, firstSnapshot.allManifests().size());
+        1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
@@ -888,8 +888,8 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
         .commit();
 
     Snapshot secondSnapshot = table.currentSnapshot();
-    Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests());
-    secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
+    Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests(table.io()));
+    secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
     Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size());
 
     table.manageSnapshots()
@@ -928,7 +928,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot firstSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should create one manifest",
-        1, firstSnapshot.allManifests().size());
+        1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
@@ -937,8 +937,8 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
         .commit();
 
     Snapshot secondSnapshot = table.currentSnapshot();
-    Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests());
-    secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
+    Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests(table.io()));
+    secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
     Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size());
 
     table.manageSnapshots()
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 3f465fe72..9d4602765 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -269,14 +269,14 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
     Assert.assertEquals(
         "Data manifest should not have existing data file",
         0,
-        (long) table.currentSnapshot().dataManifests().get(0).existingFilesCount());
+        (long) table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount());
     Assert.assertEquals("Data manifest should have 1 delete data file",
         1L,
-        (long) table.currentSnapshot().dataManifests().get(0).deletedFilesCount());
+        (long) table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount());
     Assert.assertEquals(
         "Delete manifest added row count should equal total count",
         total,
-        (long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
+        (long) table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount());
   }
 
   @Test
@@ -962,7 +962,8 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
-    Assert.assertTrue("Should have written 40+ files", Iterables.size(table.currentSnapshot().addedFiles()) >= 40);
+    Assert.assertTrue("Should have written 40+ files",
+        Iterables.size(table.currentSnapshot().addedFiles(table.io())) >= 40);
 
     table.refresh();
 
@@ -1223,7 +1224,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
     NestedField field = table.schema().caseInsensitiveFindField(column);
     Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();
 
-    Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles())
+    Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles(table.io()))
         .collect(Collectors.groupingBy(DataFile::partition));
 
     Stream<Pair<Pair<T, T>, Pair<T, T>>> overlaps =
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 40adb7d4c..f30251e74 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -129,7 +129,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -143,7 +143,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
 
     Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -183,7 +183,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -207,7 +207,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
     table.refresh();
 
     // table should reflect the changes, since the commit was successful
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
 
     Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -262,7 +262,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 4 manifests before rewrite", 4, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -284,7 +284,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
 
     Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -347,7 +347,8 @@ public class TestRewriteManifestsAction extends SparkTestBase {
           .stagingLocation(temp.newFolder().toString())
           .execute();
 
-      Assert.assertEquals("Action should rewrite all manifests", snapshot.allManifests(), result.rewrittenManifests());
+      Assert.assertEquals("Action should rewrite all manifests",
+          snapshot.allManifests(table.io()), result.rewrittenManifests());
       Assert.assertEquals("Action should add 1 manifest", 1, Iterables.size(result.addedManifests()));
 
     } finally {
@@ -375,7 +376,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifests before rewrite", 1, manifests.size());
 
     // set the target manifest size to a small value to force splitting records into multiple files
@@ -395,7 +396,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
 
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
@@ -430,7 +431,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -447,7 +448,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
 
     Assert.assertFalse("First manifest must be rewritten", newManifests.contains(manifests.get(0)));
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
index 139136282..f6d292f89 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
@@ -181,7 +181,7 @@ public class TestDataFrameWrites extends AvroDataTest {
     }
     Assert.assertEquals("Both iterators should be exhausted", expectedIter.hasNext(), actualIter.hasNext());
 
-    table.currentSnapshot().addedFiles().forEach(dataFile ->
+    table.currentSnapshot().addedFiles(table.io()).forEach(dataFile ->
         Assert.assertTrue(
             String.format(
                 "File should have the parent directory %s, but has: %s.",
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 7655b4b82..55605288b 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -203,7 +203,7 @@ public class TestDataSourceOptions {
         .mode("append")
         .save(tableLocation);
 
-    List<DataFile> files = Lists.newArrayList(icebergTable.currentSnapshot().addedFiles());
+    List<DataFile> files = Lists.newArrayList(icebergTable.currentSnapshot().addedFiles(icebergTable.io()));
     Assert.assertEquals("Should have written 1 file", 1, files.size());
 
     long fileSize = files.get(0).fileSizeInBytes();
@@ -327,7 +327,7 @@ public class TestDataSourceOptions {
         .mode("append")
         .save(tableLocation);
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
 
     Assert.assertEquals("Must be 2 manifests", 2, manifests.size());
 
@@ -356,7 +356,7 @@ public class TestDataSourceOptions {
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
     Map<String, String> options = Maps.newHashMap();
-    tables.create(SCHEMA, spec, options, tableLocation);
+    Table icebergTable = tables.create(SCHEMA, spec, options, tableLocation);
 
     List<SimpleRecord> expectedRecords = Lists.newArrayList(
         new SimpleRecord(1, "a"),
@@ -371,7 +371,7 @@ public class TestDataSourceOptions {
     int splitSize = (int) TableProperties.METADATA_SPLIT_SIZE_DEFAULT; // 32MB split size
 
     int expectedSplits = ((int) tables.load(tableLocation + "#entries")
-        .currentSnapshot().allManifests().get(0).length() + splitSize - 1) / splitSize;
+        .currentSnapshot().allManifests(icebergTable.io()).get(0).length() + splitSize - 1) / splitSize;
 
     Dataset<Row> metadataDf = spark.read()
         .format("iceberg")
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index ea6aaae53..d6f65af4d 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -148,9 +148,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     Snapshot snapshot = table.currentSnapshot();
 
-    Assert.assertEquals("Should only contain one manifest", 1, snapshot.allManifests().size());
+    Assert.assertEquals("Should only contain one manifest", 1, snapshot.allManifests(table.io()).size());
 
-    InputFile manifest = table.io().newInputFile(snapshot.allManifests().get(0).path());
+    InputFile manifest = table.io().newInputFile(snapshot.allManifests(table.io()).get(0).path());
     List<GenericData.Record> expected = Lists.newArrayList();
     try (CloseableIterable<GenericData.Record> rows = Avro.read(manifest).project(entriesTable.schema()).build()) {
       // each row must inherit snapshot_id and sequence_number
@@ -206,7 +206,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next();
 
     List<Object[]> singleActual = rowsToJava(spark.read()
         .format("iceberg")
@@ -233,7 +233,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next();
 
     List<Object[]> multiActual = rowsToJava(spark.read()
         .format("iceberg")
@@ -261,7 +261,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next();
 
     List<Object[]> multiActual = rowsToJava(spark.read()
         .format("iceberg")
@@ -308,7 +308,8 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::allManifests))) {
+    for (ManifestFile manifest : Iterables.concat(
+        Iterables.transform(table.snapshots(), s -> s.allManifests(table.io())))) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         // each row must inherit snapshot_id and sequence_number
@@ -384,7 +385,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         for (GenericData.Record record : rows) {
@@ -441,7 +442,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
           .collectAsList();
 
       List<GenericData.Record> expected = Lists.newArrayList();
-      for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+      for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
         InputFile in = table.io().newInputFile(manifest.path());
         try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
           for (GenericData.Record record : rows) {
@@ -530,7 +531,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
+    DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles(table.io()));
 
     // add a second file
     df2.select("id", "data").write()
@@ -547,7 +548,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         for (GenericData.Record record : rows) {
@@ -644,7 +645,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     actual.sort(Comparator.comparing(o -> o.getString(1)));
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::dataManifests))) {
+    Iterable<ManifestFile> dataManifests = Iterables.concat(Iterables.transform(table.snapshots(),
+        snapshot -> snapshot.dataManifests(table.io())));
+    for (ManifestFile manifest : dataManifests) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         for (GenericData.Record record : rows) {
@@ -897,7 +900,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         manifestTable.schema(), "manifests"));
     GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
         manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
-    List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().allManifests(), manifest ->
+    List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest ->
         builder
             .set("content", manifest.content().id())
             .set("path", manifest.path())
@@ -967,7 +970,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema.asStruct()));
     GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
         projectedSchema.findType("partition_summaries.element").asStructType(), "partition_summary"));
-    List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().allManifests(), manifest ->
+    List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest ->
         builder.set("partition_spec_id", manifest.partitionSpecId())
             .set("path", manifest.path())
             .set("partition_summaries", Lists.transform(manifest.partitions(), partition ->
@@ -999,11 +1002,11 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .mode("append")
         .save(loadLocation(tableIdentifier));
 
-    manifests.addAll(table.currentSnapshot().allManifests());
+    manifests.addAll(table.currentSnapshot().allManifests(table.io()));
 
     table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
 
-    manifests.addAll(table.currentSnapshot().allManifests());
+    manifests.addAll(table.currentSnapshot().allManifests(table.io()));
 
     List<Row> actual = spark.read()
         .format("iceberg")
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
index f6ca5f7de..cd1404766 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
@@ -156,7 +156,7 @@ public class TestSparkDataFile {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifest", 1, manifests.size());
 
     List<DataFile> dataFiles = Lists.newArrayList();
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 8bf57ba65..5b158c518 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -133,7 +133,7 @@ public class TestSparkDataWrite {
     List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
     Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
     Assert.assertEquals("Result rows should match", expected, actual);
-    for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
       for (DataFile file : ManifestFiles.read(manifest, table.io())) {
         // TODO: avro not support split
         if (!format.equals(FileFormat.AVRO)) {
@@ -372,7 +372,7 @@ public class TestSparkDataWrite {
     Assert.assertEquals("Result rows should match", expected, actual);
 
     List<DataFile> files = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
       for (DataFile file : ManifestFiles.read(manifest, table.io())) {
         files.add(file);
       }
@@ -590,7 +590,7 @@ public class TestSparkDataWrite {
     Assert.assertEquals("Result rows should match", expected, actual);
 
     List<DataFile> files = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
       for (DataFile file : ManifestFiles.read(manifest, table.io())) {
         files.add(file);
       }
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
index 1cbb4fef0..510d7c40e 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
@@ -66,7 +66,7 @@ public class TestRefreshTable extends SparkCatalogTestBase {
 
     // Modify table outside of spark, it should be cached so Spark should see the same value after mutation
     Table table = validationCatalog.loadTable(tableIdent);
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next();
     table.newDelete().deleteFile(file).commit();
 
     List<Object[]> cachedActual = sql("SELECT * FROM %s", tableName);
diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
index b04f17693..dcf0a2d91 100644
--- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
+++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java
@@ -60,7 +60,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     Table table = validationCatalog.loadTable(tableIdent);
 
-    Assert.assertEquals("Must have 1 manifest", 1, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 1 manifest", 1, table.currentSnapshot().allManifests(table.io()).size());
 
     sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest.target-size-bytes' '1')", tableName);
 
@@ -72,7 +72,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     table.refresh();
 
-    Assert.assertEquals("Must have 4 manifests", 4, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 4 manifests", 4, table.currentSnapshot().allManifests(table.io()).size());
   }
 
   @Test
@@ -88,7 +88,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     Table table = validationCatalog.loadTable(tableIdent);
 
-    Assert.assertEquals("Must have 4 manifest", 4, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 4 manifest", 4, table.currentSnapshot().allManifests(table.io()).size());
 
     List<Object[]> output = sql(
         "CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableIdent);
@@ -98,7 +98,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     table.refresh();
 
-    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size());
   }
 
   @Test
@@ -110,7 +110,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     Table table = validationCatalog.loadTable(tableIdent);
 
-    Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests(table.io()).size());
 
     List<Object[]> output = sql(
         "CALL %s.system.rewrite_manifests(use_caching => false, table => '%s')", catalogName, tableIdent);
@@ -120,7 +120,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     table.refresh();
 
-    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size());
   }
 
   @Test
@@ -132,7 +132,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     Table table = validationCatalog.loadTable(tableIdent);
 
-    Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 2 manifest", 2, table.currentSnapshot().allManifests(table.io()).size());
 
     List<Object[]> output = sql(
         "CALL %s.system.rewrite_manifests(usE_cAcHiNg => false, tAbLe => '%s')", catalogName, tableIdent);
@@ -142,7 +142,7 @@ public class TestRewriteManifestsProcedure extends SparkExtensionsTestBase {
 
     table.refresh();
 
-    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests().size());
+    Assert.assertEquals("Must have 1 manifests", 1, table.currentSnapshot().allManifests(table.io()).size());
   }
 
   @Test
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index c446d42ca..b1769f428 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -261,7 +261,7 @@ public class BaseRewriteManifestsSparkAction
       return ImmutableList.of();
     }
 
-    return currentSnapshot.dataManifests().stream()
+    return currentSnapshot.dataManifests(fileIO).stream()
         .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
         .collect(Collectors.toList());
   }
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index dd68022c6..d72928b6b 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -111,7 +111,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
     }
 
     Snapshot latestSnapshot = table.currentSnapshot();
-    return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()), false);
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles(table.io())), false);
   }
 
   @Override
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index c4445e954..9b8c9d250 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -570,13 +570,13 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     expectedDeletes.add(snapshotA.manifestListLocation());
 
     // Files should be deleted of dangling staged snapshot
-    snapshotB.addedFiles().forEach(i -> {
+    snapshotB.addedFiles(table.io()).forEach(i -> {
       expectedDeletes.add(i.path().toString());
     });
 
     // ManifestList should be deleted too
     expectedDeletes.add(snapshotB.manifestListLocation());
-    snapshotB.dataManifests().forEach(file -> {
+    snapshotB.dataManifests(table.io()).forEach(file -> {
       // Only the manifest of B should be deleted.
       if (file.snapshotId() == snapshotB.snapshotId()) {
         expectedDeletes.add(file.path());
@@ -645,7 +645,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     // Make sure no dataFiles are deleted for the B, C, D snapshot
     Lists.newArrayList(snapshotB, snapshotC, snapshotD).forEach(i -> {
-      i.addedFiles().forEach(item -> {
+      i.addedFiles(table.io()).forEach(item -> {
         Assert.assertFalse(deletedFiles.contains(item.path().toString()));
       });
     });
@@ -700,7 +700,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     // Make sure no dataFiles are deleted for the staged snapshot
     Lists.newArrayList(snapshotB).forEach(i -> {
-      i.addedFiles().forEach(item -> {
+      i.addedFiles(table.io()).forEach(item -> {
         Assert.assertFalse(deletedFiles.contains(item.path().toString()));
       });
     });
@@ -714,7 +714,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     // Make sure no dataFiles are deleted for the staged and cherry-pick
     Lists.newArrayList(snapshotB, snapshotD).forEach(i -> {
-      i.addedFiles().forEach(item -> {
+      i.addedFiles(table.io()).forEach(item -> {
         Assert.assertFalse(deletedFiles.contains(item.path().toString()));
       });
     });
@@ -762,7 +762,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot firstSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should create one manifest",
-        1, firstSnapshot.allManifests().size());
+        1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
@@ -772,7 +772,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot secondSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should create replace manifest with a rewritten manifest",
-        1, secondSnapshot.allManifests().size());
+        1, secondSnapshot.allManifests(table.io()).size());
 
     table.newAppend()
         .appendFile(FILE_B)
@@ -798,9 +798,9 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     Assert.assertEquals("Should remove expired manifest lists and deleted data file",
         Sets.newHashSet(
             firstSnapshot.manifestListLocation(), // snapshot expired
-            firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete
+            firstSnapshot.allManifests(table.io()).get(0).path(), // manifest was rewritten for delete
             secondSnapshot.manifestListLocation(), // snapshot expired
-            secondSnapshot.allManifests().get(0).path(), // manifest contained only deletes, was dropped
+            secondSnapshot.allManifests(table.io()).get(0).path(), // manifest contained only deletes, was dropped
             FILE_A.path()), // deleted
         deletedFiles);
 
@@ -821,7 +821,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot firstSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should create one manifest",
-        1, firstSnapshot.allManifests().size());
+        1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
@@ -831,7 +831,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot secondSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should replace manifest with a rewritten manifest",
-        1, secondSnapshot.allManifests().size());
+        1, secondSnapshot.allManifests(table.io()).size());
 
     table.newFastAppend() // do not merge to keep the last snapshot's manifest valid
         .appendFile(FILE_C)
@@ -857,7 +857,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     Assert.assertEquals("Should remove expired manifest lists and deleted data file",
         Sets.newHashSet(
             firstSnapshot.manifestListLocation(), // snapshot expired
-            firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete
+            firstSnapshot.allManifests(table.io()).get(0).path(), // manifest was rewritten for delete
             secondSnapshot.manifestListLocation(), // snapshot expired
             FILE_A.path()), // deleted
         deletedFiles);
@@ -879,7 +879,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot firstSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should create one manifest",
-        1, firstSnapshot.allManifests().size());
+        1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
@@ -888,8 +888,8 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
         .commit();
 
     Snapshot secondSnapshot = table.currentSnapshot();
-    Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests());
-    secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
+    Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests(table.io()));
+    secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
     Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size());
 
     table.manageSnapshots()
@@ -928,7 +928,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     Snapshot firstSnapshot = table.currentSnapshot();
     Assert.assertEquals("Should create one manifest",
-        1, firstSnapshot.allManifests().size());
+        1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
@@ -937,8 +937,8 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
         .commit();
 
     Snapshot secondSnapshot = table.currentSnapshot();
-    Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests());
-    secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
+    Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests(table.io()));
+    secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
     Assert.assertEquals("Should add one new manifest for append", 1, secondSnapshotManifests.size());
 
     table.manageSnapshots()
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 3f465fe72..9d4602765 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -269,14 +269,14 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
     Assert.assertEquals(
         "Data manifest should not have existing data file",
         0,
-        (long) table.currentSnapshot().dataManifests().get(0).existingFilesCount());
+        (long) table.currentSnapshot().dataManifests(table.io()).get(0).existingFilesCount());
     Assert.assertEquals("Data manifest should have 1 delete data file",
         1L,
-        (long) table.currentSnapshot().dataManifests().get(0).deletedFilesCount());
+        (long) table.currentSnapshot().dataManifests(table.io()).get(0).deletedFilesCount());
     Assert.assertEquals(
         "Delete manifest added row count should equal total count",
         total,
-        (long) table.currentSnapshot().deleteManifests().get(0).addedRowsCount());
+        (long) table.currentSnapshot().deleteManifests(table.io()).get(0).addedRowsCount());
   }
 
   @Test
@@ -962,7 +962,8 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 1 fileGroups", result.rewriteResults().size(), 1);
-    Assert.assertTrue("Should have written 40+ files", Iterables.size(table.currentSnapshot().addedFiles()) >= 40);
+    Assert.assertTrue("Should have written 40+ files",
+        Iterables.size(table.currentSnapshot().addedFiles(table.io())) >= 40);
 
     table.refresh();
 
@@ -1223,7 +1224,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
     NestedField field = table.schema().caseInsensitiveFindField(column);
     Class<T> javaClass = (Class<T>) field.type().typeId().javaClass();
 
-    Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles())
+    Map<StructLike, List<DataFile>> filesByPartition = Streams.stream(table.currentSnapshot().addedFiles(table.io()))
         .collect(Collectors.groupingBy(DataFile::partition));
 
     Stream<Pair<Pair<T, T>, Pair<T, T>>> overlaps =
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 40adb7d4c..f30251e74 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -129,7 +129,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -143,7 +143,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
 
     Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -183,7 +183,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -207,7 +207,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
     table.refresh();
 
     // table should reflect the changes, since the commit was successful
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
 
     Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -262,7 +262,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 4 manifests before rewrite", 4, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -284,7 +284,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
 
     Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -347,7 +347,8 @@ public class TestRewriteManifestsAction extends SparkTestBase {
           .stagingLocation(temp.newFolder().toString())
           .execute();
 
-      Assert.assertEquals("Action should rewrite all manifests", snapshot.allManifests(), result.rewrittenManifests());
+      Assert.assertEquals("Action should rewrite all manifests",
+          snapshot.allManifests(table.io()), result.rewrittenManifests());
       Assert.assertEquals("Action should add 1 manifest", 1, Iterables.size(result.addedManifests()));
 
     } finally {
@@ -375,7 +376,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifests before rewrite", 1, manifests.size());
 
     // set the target manifest size to a small value to force splitting records into multiple files
@@ -395,7 +396,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
 
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
@@ -430,7 +431,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -447,7 +448,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
 
     Assert.assertFalse("First manifest must be rewritten", newManifests.contains(manifests.get(0)));
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
index 139136282..f6d292f89 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
@@ -181,7 +181,7 @@ public class TestDataFrameWrites extends AvroDataTest {
     }
     Assert.assertEquals("Both iterators should be exhausted", expectedIter.hasNext(), actualIter.hasNext());
 
-    table.currentSnapshot().addedFiles().forEach(dataFile ->
+    table.currentSnapshot().addedFiles(table.io()).forEach(dataFile ->
         Assert.assertTrue(
             String.format(
                 "File should have the parent directory %s, but has: %s.",
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 7655b4b82..55605288b 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -203,7 +203,7 @@ public class TestDataSourceOptions {
         .mode("append")
         .save(tableLocation);
 
-    List<DataFile> files = Lists.newArrayList(icebergTable.currentSnapshot().addedFiles());
+    List<DataFile> files = Lists.newArrayList(icebergTable.currentSnapshot().addedFiles(icebergTable.io()));
     Assert.assertEquals("Should have written 1 file", 1, files.size());
 
     long fileSize = files.get(0).fileSizeInBytes();
@@ -327,7 +327,7 @@ public class TestDataSourceOptions {
         .mode("append")
         .save(tableLocation);
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
 
     Assert.assertEquals("Must be 2 manifests", 2, manifests.size());
 
@@ -356,7 +356,7 @@ public class TestDataSourceOptions {
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
     Map<String, String> options = Maps.newHashMap();
-    tables.create(SCHEMA, spec, options, tableLocation);
+    Table icebergTable = tables.create(SCHEMA, spec, options, tableLocation);
 
     List<SimpleRecord> expectedRecords = Lists.newArrayList(
         new SimpleRecord(1, "a"),
@@ -371,7 +371,7 @@ public class TestDataSourceOptions {
     int splitSize = (int) TableProperties.METADATA_SPLIT_SIZE_DEFAULT; // 32MB split size
 
     int expectedSplits = ((int) tables.load(tableLocation + "#entries")
-        .currentSnapshot().allManifests().get(0).length() + splitSize - 1) / splitSize;
+        .currentSnapshot().allManifests(icebergTable.io()).get(0).length() + splitSize - 1) / splitSize;
 
     Dataset<Row> metadataDf = spark.read()
         .format("iceberg")
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index ea6aaae53..d6f65af4d 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -148,9 +148,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     Snapshot snapshot = table.currentSnapshot();
 
-    Assert.assertEquals("Should only contain one manifest", 1, snapshot.allManifests().size());
+    Assert.assertEquals("Should only contain one manifest", 1, snapshot.allManifests(table.io()).size());
 
-    InputFile manifest = table.io().newInputFile(snapshot.allManifests().get(0).path());
+    InputFile manifest = table.io().newInputFile(snapshot.allManifests(table.io()).get(0).path());
     List<GenericData.Record> expected = Lists.newArrayList();
     try (CloseableIterable<GenericData.Record> rows = Avro.read(manifest).project(entriesTable.schema()).build()) {
       // each row must inherit snapshot_id and sequence_number
@@ -206,7 +206,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next();
 
     List<Object[]> singleActual = rowsToJava(spark.read()
         .format("iceberg")
@@ -233,7 +233,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next();
 
     List<Object[]> multiActual = rowsToJava(spark.read()
         .format("iceberg")
@@ -261,7 +261,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next();
 
     List<Object[]> multiActual = rowsToJava(spark.read()
         .format("iceberg")
@@ -308,7 +308,8 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::allManifests))) {
+    for (ManifestFile manifest : Iterables.concat(
+        Iterables.transform(table.snapshots(), s -> s.allManifests(table.io())))) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         // each row must inherit snapshot_id and sequence_number
@@ -384,7 +385,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         for (GenericData.Record record : rows) {
@@ -441,7 +442,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
           .collectAsList();
 
       List<GenericData.Record> expected = Lists.newArrayList();
-      for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+      for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
         InputFile in = table.io().newInputFile(manifest.path());
         try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
           for (GenericData.Record record : rows) {
@@ -530,7 +531,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
+    DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles(table.io()));
 
     // add a second file
     df2.select("id", "data").write()
@@ -547,7 +548,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         for (GenericData.Record record : rows) {
@@ -644,7 +645,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     actual.sort(Comparator.comparing(o -> o.getString(1)));
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::dataManifests))) {
+    Iterable<ManifestFile> dataManifests = Iterables.concat(Iterables.transform(table.snapshots(),
+        snapshot -> snapshot.dataManifests(table.io())));
+    for (ManifestFile manifest : dataManifests) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
         for (GenericData.Record record : rows) {
@@ -897,7 +900,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         manifestTable.schema(), "manifests"));
     GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
         manifestTable.schema().findType("partition_summaries.element").asStructType(), "partition_summary"));
-    List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().allManifests(), manifest ->
+    List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest ->
         builder
             .set("content", manifest.content().id())
             .set("path", manifest.path())
@@ -967,7 +970,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema.asStruct()));
     GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
         projectedSchema.findType("partition_summaries.element").asStructType(), "partition_summary"));
-    List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().allManifests(), manifest ->
+    List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().allManifests(table.io()), manifest ->
         builder.set("partition_spec_id", manifest.partitionSpecId())
             .set("path", manifest.path())
             .set("partition_summaries", Lists.transform(manifest.partitions(), partition ->
@@ -999,11 +1002,11 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .mode("append")
         .save(loadLocation(tableIdentifier));
 
-    manifests.addAll(table.currentSnapshot().allManifests());
+    manifests.addAll(table.currentSnapshot().allManifests(table.io()));
 
     table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
 
-    manifests.addAll(table.currentSnapshot().allManifests());
+    manifests.addAll(table.currentSnapshot().allManifests(table.io()));
 
     List<Row> actual = spark.read()
         .format("iceberg")
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
index f6ca5f7de..cd1404766 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
@@ -156,7 +156,7 @@ public class TestSparkDataFile {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifest", 1, manifests.size());
 
     List<DataFile> dataFiles = Lists.newArrayList();
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 8bf57ba65..5b158c518 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -133,7 +133,7 @@ public class TestSparkDataWrite {
     List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
     Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
     Assert.assertEquals("Result rows should match", expected, actual);
-    for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
       for (DataFile file : ManifestFiles.read(manifest, table.io())) {
         // TODO: avro not support split
         if (!format.equals(FileFormat.AVRO)) {
@@ -372,7 +372,7 @@ public class TestSparkDataWrite {
     Assert.assertEquals("Result rows should match", expected, actual);
 
     List<DataFile> files = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
       for (DataFile file : ManifestFiles.read(manifest, table.io())) {
         files.add(file);
       }
@@ -590,7 +590,7 @@ public class TestSparkDataWrite {
     Assert.assertEquals("Result rows should match", expected, actual);
 
     List<DataFile> files = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
       for (DataFile file : ManifestFiles.read(manifest, table.io())) {
         files.add(file);
       }
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
index 3952019fb..187dd4470 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
@@ -61,7 +61,7 @@ public class TestRefreshTable extends SparkCatalogTestBase {
 
     // Modify table outside of spark, it should be cached so Spark should see the same value after mutation
     Table table = validationCatalog.loadTable(tableIdent);
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedFiles(table.io()).iterator().next();
     table.newDelete().deleteFile(file).commit();
 
     List<Object[]> cachedActual = sql("SELECT * FROM %s", tableName);