You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/09/29 21:07:52 UTC

[iceberg] branch 1.0.x updated: API, Core: Remove deprecated methods from Snapshot API, backport #5734 (#5875)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 1.0.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.0.x by this push:
     new 737474b664 API, Core: Remove deprecated methods from Snapshot API, backport #5734 (#5875)
737474b664 is described below

commit 737474b664db9a735029f7352954e22846f03da2
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Thu Sep 29 23:07:37 2022 +0200

    API, Core: Remove deprecated methods from Snapshot API, backport #5734 (#5875)
---
 .palantir/revapi.yml                               |  21 ++-
 api/src/main/java/org/apache/iceberg/Snapshot.java |  56 ------
 .../apache/iceberg/BaseIncrementalAppendScan.java  |   2 +-
 .../main/java/org/apache/iceberg/BaseSnapshot.java | 142 ++++-----------
 .../org/apache/iceberg/MetadataUpdateParser.java   |  13 +-
 .../org/apache/iceberg/PartitionSpecParser.java    |   2 +-
 .../main/java/org/apache/iceberg/SchemaParser.java |  11 +-
 .../java/org/apache/iceberg/SnapshotParser.java    |  71 ++++++--
 .../java/org/apache/iceberg/SnapshotProducer.java  |   2 -
 .../java/org/apache/iceberg/SortOrderParser.java   |   2 +-
 .../org/apache/iceberg/TableMetadataParser.java    |  30 ++--
 .../apache/iceberg/puffin/FileMetadataParser.java  |   6 +-
 .../rest/responses/ErrorResponseParser.java        |   4 +-
 .../java/org/apache/iceberg/util/JsonUtil.java     |  62 +++----
 .../apache/iceberg/TestMetadataUpdateParser.java   |   4 +-
 .../java/org/apache/iceberg/TestSnapshotJson.java  |  38 ++--
 .../org/apache/iceberg/TestSnapshotRefParser.java  |   8 +-
 .../java/org/apache/iceberg/TestTableMetadata.java |  39 ++---
 .../iceberg/catalog/TestTableIdentifierParser.java |   8 +-
 .../iceberg/puffin/TestFileMetadataParser.java     |   2 +-
 .../rest/requests/TestRenameTableRequest.java      |   8 +-
 .../rest/responses/TestLoadTableResponse.java      |   7 +-
 .../rest/responses/TestOAuthTokenResponse.java     |   8 +-
 .../java/org/apache/iceberg/util/TestJsonUtil.java | 192 +++++++++++++++++++++
 .../java/org/apache/iceberg/nessie/NessieUtil.java |   2 +-
 .../actions/BaseRewriteManifestsSparkAction.java   |   2 +-
 .../examples/SnapshotFunctionalityTest.java        |   2 +-
 .../spark/actions/TestExpireSnapshotsAction.java   |  48 ++++--
 .../spark/actions/TestRewriteManifestsAction.java  |  22 +--
 .../iceberg/spark/source/TestDataFrameWrites.java  |   2 +-
 .../spark/source/TestDataSourceOptions.java        |   9 +-
 .../spark/source/TestIcebergSourceTablesBase.java  |  38 ++--
 .../iceberg/spark/source/TestSparkDataFile.java    |   2 +-
 .../iceberg/spark/source/TestSparkDataWrite.java   |   6 +-
 .../spark/source/TestIcebergSourceTablesBase.java  |   6 +-
 .../spark/source/TestIcebergSourceTablesBase.java  |   6 +-
 .../spark/source/SparkMicroBatchStream.java        |   4 +-
 .../spark/source/TestIcebergSourceTablesBase.java  |   6 +-
 .../spark/source/SparkMicroBatchStream.java        |   4 +-
 .../spark/source/TestIcebergSourceTablesBase.java  |   6 +-
 40 files changed, 509 insertions(+), 394 deletions(-)

diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index da91649da0..b04a7a12b5 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -1,11 +1,30 @@
 versionOverrides:
-  org.apache.iceberg:iceberg-api:release-base-0.13.0: "0.13.0"
+  org.apache.iceberg:iceberg-api:apache-iceberg-0.14.0: "0.14.0"
 acceptedBreaks:
   apache-iceberg-0.14.0:
     org.apache.iceberg:iceberg-api:
+    - code: "java.class.defaultSerializationChanged"
+      old: "class org.apache.iceberg.PartitionKey"
+      new: "class org.apache.iceberg.PartitionKey"
+      justification: "Serialization across versions is not supported"
     - code: "java.class.removed"
       old: "interface org.apache.iceberg.Rollback"
       justification: "Deprecations for 1.0 release"
+    - code: "java.method.removed"
+      old: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::addedFiles()"
+      justification: "Deprecations for 1.0 release"
+    - code: "java.method.removed"
+      old: "method java.util.List<org.apache.iceberg.ManifestFile> org.apache.iceberg.Snapshot::allManifests()"
+      justification: "Deprecations for 1.0 release"
+    - code: "java.method.removed"
+      old: "method java.util.List<org.apache.iceberg.ManifestFile> org.apache.iceberg.Snapshot::dataManifests()"
+      justification: "Deprecations for 1.0 release"
+    - code: "java.method.removed"
+      old: "method java.util.List<org.apache.iceberg.ManifestFile> org.apache.iceberg.Snapshot::deleteManifests()"
+      justification: "Deprecations for 1.0 release"
+    - code: "java.method.removed"
+      old: "method java.lang.Iterable<org.apache.iceberg.DataFile> org.apache.iceberg.Snapshot::deletedFiles()"
+      justification: "Deprecations for 1.0 release"
     - code: "java.method.removed"
       old: "method org.apache.iceberg.OverwriteFiles org.apache.iceberg.OverwriteFiles::validateNoConflictingAppends(org.apache.iceberg.expressions.Expression)"
       justification: "Deprecations for 1.0 release"
diff --git a/api/src/main/java/org/apache/iceberg/Snapshot.java b/api/src/main/java/org/apache/iceberg/Snapshot.java
index e998fbc4b6..e33fd1cd18 100644
--- a/api/src/main/java/org/apache/iceberg/Snapshot.java
+++ b/api/src/main/java/org/apache/iceberg/Snapshot.java
@@ -64,16 +64,6 @@ public interface Snapshot extends Serializable {
    */
   long timestampMillis();
 
-  /**
-   * Return all {@link ManifestFile} instances for either data or delete manifests in this snapshot.
-   *
-   * @return a list of ManifestFile
-   * @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#allManifests(FileIO)}
-   *     instead.
-   */
-  @Deprecated
-  List<ManifestFile> allManifests();
-
   /**
    * Return all {@link ManifestFile} instances for either data or delete manifests in this snapshot.
    *
@@ -82,16 +72,6 @@ public interface Snapshot extends Serializable {
    */
   List<ManifestFile> allManifests(FileIO io);
 
-  /**
-   * Return a {@link ManifestFile} for each data manifest in this snapshot.
-   *
-   * @return a list of ManifestFile
-   * @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#dataManifests(FileIO)}
-   *     instead.
-   */
-  @Deprecated
-  List<ManifestFile> dataManifests();
-
   /**
    * Return a {@link ManifestFile} for each data manifest in this snapshot.
    *
@@ -100,16 +80,6 @@ public interface Snapshot extends Serializable {
    */
   List<ManifestFile> dataManifests(FileIO io);
 
-  /**
-   * Return a {@link ManifestFile} for each delete manifest in this snapshot.
-   *
-   * @return a list of ManifestFile
-   * @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link
-   *     Snapshot#deleteManifests(FileIO)} instead.
-   */
-  @Deprecated
-  List<ManifestFile> deleteManifests();
-
   /**
    * Return a {@link ManifestFile} for each delete manifest in this snapshot.
    *
@@ -133,19 +103,6 @@ public interface Snapshot extends Serializable {
    */
   Map<String, String> summary();
 
-  /**
-   * Return all data files added to the table in this snapshot.
-   *
-   * <p>The files returned include the following columns: file_path, file_format, partition,
-   * record_count, and file_size_in_bytes. Other columns will be null.
-   *
-   * @return all data files added to the table in this snapshot.
-   * @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#addedDataFiles(FileIO)}
-   *     instead.
-   */
-  @Deprecated
-  Iterable<DataFile> addedFiles();
-
   /**
    * Return all data files added to the table in this snapshot.
    *
@@ -157,19 +114,6 @@ public interface Snapshot extends Serializable {
    */
   Iterable<DataFile> addedDataFiles(FileIO io);
 
-  /**
-   * Return all data files deleted from the table in this snapshot.
-   *
-   * <p>The files returned include the following columns: file_path, file_format, partition,
-   * record_count, and file_size_in_bytes. Other columns will be null.
-   *
-   * @return all data files deleted from the table in this snapshot.
-   * @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link
-   *     Snapshot#removedDataFiles(FileIO)} instead.
-   */
-  @Deprecated
-  Iterable<DataFile> deletedFiles();
-
   /**
    * Return all data files removed from the table in this snapshot.
    *
diff --git a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
index d8386bd98e..3c3e6e2630 100644
--- a/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
@@ -174,7 +174,7 @@ class BaseIncrementalAppendScan
     Set<Long> snapshotIds = Sets.newHashSet(Iterables.transform(snapshots, Snapshot::snapshotId));
     Set<ManifestFile> manifests =
         FluentIterable.from(snapshots)
-            .transformAndConcat(Snapshot::dataManifests)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
             .filter(manifestFile -> snapshotIds.contains(manifestFile.snapshotId()))
             .toSet();
 
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 817d96e3ec..336501e0d6 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -36,12 +36,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 class BaseSnapshot implements Snapshot {
   private static final long INITIAL_SEQUENCE_NUMBER = 0;
 
-  /**
-   * @deprecated since 0.14.0, will be removed in 1.0.0; {@link FileIO} should be passed to methods
-   *     which require it
-   */
-  @Deprecated private final FileIO io;
-
   private final long snapshotId;
   private final Long parentId;
   private final long sequenceNumber;
@@ -50,6 +44,7 @@ class BaseSnapshot implements Snapshot {
   private final String operation;
   private final Map<String, String> summary;
   private final Integer schemaId;
+  private final String[] v1ManifestLocations;
 
   // lazily initialized
   private transient List<ManifestFile> allManifests = null;
@@ -60,23 +55,7 @@ class BaseSnapshot implements Snapshot {
   private transient List<DeleteFile> addedDeleteFiles = null;
   private transient List<DeleteFile> removedDeleteFiles = null;
 
-  /** For testing only. */
-  BaseSnapshot(FileIO io, long snapshotId, Integer schemaId, String... manifestFiles) {
-    this(
-        io,
-        snapshotId,
-        null,
-        System.currentTimeMillis(),
-        null,
-        null,
-        schemaId,
-        Lists.transform(
-            Arrays.asList(manifestFiles),
-            path -> new GenericManifestFile(io.newInputFile(path), 0)));
-  }
-
   BaseSnapshot(
-      FileIO io,
       long sequenceNumber,
       long snapshotId,
       Long parentId,
@@ -85,7 +64,6 @@ class BaseSnapshot implements Snapshot {
       Map<String, String> summary,
       Integer schemaId,
       String manifestList) {
-    this.io = io;
     this.sequenceNumber = sequenceNumber;
     this.snapshotId = snapshotId;
     this.parentId = parentId;
@@ -94,30 +72,10 @@ class BaseSnapshot implements Snapshot {
     this.summary = summary;
     this.schemaId = schemaId;
     this.manifestListLocation = manifestList;
+    this.v1ManifestLocations = null;
   }
 
   BaseSnapshot(
-      long sequenceNumber,
-      long snapshotId,
-      Long parentId,
-      long timestampMillis,
-      String operation,
-      Map<String, String> summary,
-      Integer schemaId,
-      String manifestList) {
-    this.io = null;
-    this.sequenceNumber = sequenceNumber;
-    this.snapshotId = snapshotId;
-    this.parentId = parentId;
-    this.timestampMillis = timestampMillis;
-    this.operation = operation;
-    this.summary = summary;
-    this.schemaId = schemaId;
-    this.manifestListLocation = manifestList;
-  }
-
-  BaseSnapshot(
-      FileIO io,
       long snapshotId,
       Long parentId,
       long timestampMillis,
@@ -126,7 +84,6 @@ class BaseSnapshot implements Snapshot {
       Integer schemaId,
       List<ManifestFile> dataManifests) {
     this(
-        io,
         INITIAL_SEQUENCE_NUMBER,
         snapshotId,
         parentId,
@@ -134,10 +91,30 @@ class BaseSnapshot implements Snapshot {
         operation,
         summary,
         schemaId,
-        null);
+        dataManifests.stream().map(ManifestFile::path).toArray(String[]::new));
     this.allManifests = dataManifests;
   }
 
+  BaseSnapshot(
+      long sequenceNumber,
+      long snapshotId,
+      Long parentId,
+      long timestampMillis,
+      String operation,
+      Map<String, String> summary,
+      Integer schemaId,
+      String[] v1ManifestLocations) {
+    this.sequenceNumber = sequenceNumber;
+    this.snapshotId = snapshotId;
+    this.parentId = parentId;
+    this.timestampMillis = timestampMillis;
+    this.operation = operation;
+    this.summary = summary;
+    this.schemaId = schemaId;
+    this.manifestListLocation = null;
+    this.v1ManifestLocations = v1ManifestLocations;
+  }
+
   @Override
   public long sequenceNumber() {
     return sequenceNumber;
@@ -178,6 +155,14 @@ class BaseSnapshot implements Snapshot {
       throw new IllegalArgumentException("Cannot cache changes: FileIO is null");
     }
 
+    if (allManifests == null && v1ManifestLocations != null) {
+      // if we have a collection of manifest locations, then we need to load them here
+      allManifests =
+          Lists.transform(
+              Arrays.asList(v1ManifestLocations),
+              location -> new GenericManifestFile(fileIO.newInputFile(location), 0));
+    }
+
     if (allManifests == null) {
       // if manifests isn't set, then the snapshotFile is set and should be read to get the list
       this.allManifests = ManifestLists.read(fileIO.newInputFile(manifestListLocation));
@@ -203,19 +188,6 @@ class BaseSnapshot implements Snapshot {
     return allManifests;
   }
 
-  /**
-   * @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#allManifests(FileIO)}
-   *     instead.
-   */
-  @Override
-  @Deprecated
-  public List<ManifestFile> allManifests() {
-    if (allManifests == null) {
-      cacheManifests(io);
-    }
-    return allManifests;
-  }
-
   @Override
   public List<ManifestFile> dataManifests(FileIO fileIO) {
     if (dataManifests == null) {
@@ -224,19 +196,6 @@ class BaseSnapshot implements Snapshot {
     return dataManifests;
   }
 
-  /**
-   * @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#dataManifests(FileIO)}
-   *     instead.
-   */
-  @Override
-  @Deprecated
-  public List<ManifestFile> dataManifests() {
-    if (dataManifests == null) {
-      cacheManifests(io);
-    }
-    return dataManifests;
-  }
-
   @Override
   public List<ManifestFile> deleteManifests(FileIO fileIO) {
     if (deleteManifests == null) {
@@ -245,19 +204,6 @@ class BaseSnapshot implements Snapshot {
     return deleteManifests;
   }
 
-  /**
-   * @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link
-   *     Snapshot#deleteManifests(FileIO)} instead.
-   */
-  @Override
-  @Deprecated
-  public List<ManifestFile> deleteManifests() {
-    if (deleteManifests == null) {
-      cacheManifests(io);
-    }
-    return deleteManifests;
-  }
-
   @Override
   public List<DataFile> addedDataFiles(FileIO fileIO) {
     if (addedDataFiles == null) {
@@ -266,19 +212,6 @@ class BaseSnapshot implements Snapshot {
     return addedDataFiles;
   }
 
-  /**
-   * @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link Snapshot#addedDataFiles(FileIO)}
-   *     instead.
-   */
-  @Override
-  @Deprecated
-  public List<DataFile> addedFiles() {
-    if (addedDataFiles == null) {
-      cacheDataFileChanges(io);
-    }
-    return addedDataFiles;
-  }
-
   @Override
   public List<DataFile> removedDataFiles(FileIO fileIO) {
     if (removedDataFiles == null) {
@@ -287,19 +220,6 @@ class BaseSnapshot implements Snapshot {
     return removedDataFiles;
   }
 
-  /**
-   * @deprecated since 0.14.0, will be removed in 1.0.0; Use {@link
-   *     Snapshot#removedDataFiles(FileIO)} instead.
-   */
-  @Override
-  @Deprecated
-  public List<DataFile> deletedFiles() {
-    if (removedDataFiles == null) {
-      cacheDataFileChanges(io);
-    }
-    return removedDataFiles;
-  }
-
   @Override
   public Iterable<DeleteFile> addedDeleteFiles(FileIO fileIO) {
     if (addedDeleteFiles == null) {
diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
index 9b7a76c157..c7cfa33ec8 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java
@@ -382,8 +382,7 @@ public class MetadataUpdateParser {
   }
 
   private static MetadataUpdate readAddSchema(JsonNode node) {
-    Preconditions.checkArgument(node.hasNonNull(SCHEMA), "Cannot parse missing field: schema");
-    JsonNode schemaNode = node.get(SCHEMA);
+    JsonNode schemaNode = JsonUtil.get(SCHEMA, node);
     Schema schema = SchemaParser.fromJson(schemaNode);
     int lastColumnId = JsonUtil.getInt(LAST_COLUMN_ID, node);
     return new MetadataUpdate.AddSchema(schema, lastColumnId);
@@ -395,8 +394,7 @@ public class MetadataUpdateParser {
   }
 
   private static MetadataUpdate readAddPartitionSpec(JsonNode node) {
-    Preconditions.checkArgument(node.hasNonNull(SPEC), "Missing required field: spec");
-    JsonNode specNode = node.get(SPEC);
+    JsonNode specNode = JsonUtil.get(SPEC, node);
     UnboundPartitionSpec spec = PartitionSpecParser.fromJson(specNode);
     return new MetadataUpdate.AddPartitionSpec(spec);
   }
@@ -407,9 +405,7 @@ public class MetadataUpdateParser {
   }
 
   private static MetadataUpdate readAddSortOrder(JsonNode node) {
-    Preconditions.checkArgument(
-        node.hasNonNull(SORT_ORDER), "Cannot parse missing field: sort-order");
-    JsonNode sortOrderNode = node.get(SORT_ORDER);
+    JsonNode sortOrderNode = JsonUtil.get(SORT_ORDER, node);
     UnboundSortOrder sortOrder = SortOrderParser.fromJson(sortOrderNode);
     return new MetadataUpdate.AddSortOrder(sortOrder);
   }
@@ -420,8 +416,7 @@ public class MetadataUpdateParser {
   }
 
   private static MetadataUpdate readAddSnapshot(JsonNode node) {
-    Preconditions.checkArgument(node.has(SNAPSHOT), "Cannot parse missing field: snapshot");
-    Snapshot snapshot = SnapshotParser.fromJson(null, node.get(SNAPSHOT));
+    Snapshot snapshot = SnapshotParser.fromJson(JsonUtil.get(SNAPSHOT, node));
     return new MetadataUpdate.AddSnapshot(snapshot);
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java b/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java
index 0d0fff1682..0d8e75f299 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionSpecParser.java
@@ -89,7 +89,7 @@ public class PartitionSpecParser {
     Preconditions.checkArgument(json.isObject(), "Cannot parse spec from non-object: %s", json);
     int specId = JsonUtil.getInt(SPEC_ID, json);
     UnboundPartitionSpec.Builder builder = UnboundPartitionSpec.builder().withSpecId(specId);
-    buildFromJsonFields(builder, json.get(FIELDS));
+    buildFromJsonFields(builder, JsonUtil.get(FIELDS, json));
     return builder.build();
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/SchemaParser.java b/core/src/main/java/org/apache/iceberg/SchemaParser.java
index a6f0955a2c..0c60a9decb 100644
--- a/core/src/main/java/org/apache/iceberg/SchemaParser.java
+++ b/core/src/main/java/org/apache/iceberg/SchemaParser.java
@@ -183,7 +183,6 @@ public class SchemaParser {
   private static Type typeFromJson(JsonNode json) {
     if (json.isTextual()) {
       return Types.fromPrimitiveString(json.asText());
-
     } else if (json.isObject()) {
       String type = json.get(TYPE).asText();
       if (STRUCT.equals(type)) {
@@ -199,7 +198,7 @@ public class SchemaParser {
   }
 
   private static Types.StructType structFromJson(JsonNode json) {
-    JsonNode fieldArray = json.get(FIELDS);
+    JsonNode fieldArray = JsonUtil.get(FIELDS, json);
     Preconditions.checkArgument(
         fieldArray.isArray(), "Cannot parse struct fields from non-array: %s", fieldArray);
 
@@ -212,7 +211,7 @@ public class SchemaParser {
 
       int id = JsonUtil.getInt(ID, field);
       String name = JsonUtil.getString(NAME, field);
-      Type type = typeFromJson(field.get(TYPE));
+      Type type = typeFromJson(JsonUtil.get(TYPE, field));
 
       String doc = JsonUtil.getStringOrNull(DOC, field);
       boolean isRequired = JsonUtil.getBool(REQUIRED, field);
@@ -228,7 +227,7 @@ public class SchemaParser {
 
   private static Types.ListType listFromJson(JsonNode json) {
     int elementId = JsonUtil.getInt(ELEMENT_ID, json);
-    Type elementType = typeFromJson(json.get(ELEMENT));
+    Type elementType = typeFromJson(JsonUtil.get(ELEMENT, json));
     boolean isRequired = JsonUtil.getBool(ELEMENT_REQUIRED, json);
 
     if (isRequired) {
@@ -240,10 +239,10 @@ public class SchemaParser {
 
   private static Types.MapType mapFromJson(JsonNode json) {
     int keyId = JsonUtil.getInt(KEY_ID, json);
-    Type keyType = typeFromJson(json.get(KEY));
+    Type keyType = typeFromJson(JsonUtil.get(KEY, json));
 
     int valueId = JsonUtil.getInt(VALUE_ID, json);
-    Type valueType = typeFromJson(json.get(VALUE));
+    Type valueType = typeFromJson(JsonUtil.get(VALUE, json));
 
     boolean isRequired = JsonUtil.getBool(VALUE_REQUIRED, json);
 
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotParser.java b/core/src/main/java/org/apache/iceberg/SnapshotParser.java
index 6618207b73..a0de15851c 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotParser.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotParser.java
@@ -23,19 +23,23 @@ import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SeekableInputStream;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.JsonUtil;
 
 public class SnapshotParser {
 
   private SnapshotParser() {}
 
+  /** A dummy {@link FileIO} implementation that is only used to retrieve the path */
+  private static final DummyFileIO DUMMY_FILE_IO = new DummyFileIO();
+
   private static final String SEQUENCE_NUMBER = "sequence-number";
   private static final String SNAPSHOT_ID = "snapshot-id";
   private static final String PARENT_SNAPSHOT_ID = "parent-snapshot-id";
@@ -80,7 +84,7 @@ public class SnapshotParser {
     } else {
       // embed the manifest list in the JSON, v1 only
       generator.writeArrayFieldStart(MANIFESTS);
-      for (ManifestFile file : snapshot.allManifests()) {
+      for (ManifestFile file : snapshot.allManifests(DUMMY_FILE_IO)) {
         generator.writeString(file.path());
       }
       generator.writeEndArray();
@@ -114,7 +118,7 @@ public class SnapshotParser {
     }
   }
 
-  static Snapshot fromJson(FileIO io, JsonNode node) {
+  static Snapshot fromJson(JsonNode node) {
     Preconditions.checkArgument(
         node.isObject(), "Cannot parse table version from a non-object: %s", node);
 
@@ -157,7 +161,6 @@ public class SnapshotParser {
       // the manifest list is stored in a manifest list file
       String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
       return new BaseSnapshot(
-          io,
           sequenceNumber,
           snapshotId,
           parentId,
@@ -170,20 +173,64 @@ public class SnapshotParser {
     } else {
       // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
       // loaded lazily, if it is needed
-      List<ManifestFile> manifests =
-          Lists.transform(
-              JsonUtil.getStringList(MANIFESTS, node),
-              location -> new GenericManifestFile(io.newInputFile(location), 0));
       return new BaseSnapshot(
-          io, snapshotId, parentId, timestamp, operation, summary, schemaId, manifests);
+          sequenceNumber,
+          snapshotId,
+          parentId,
+          timestamp,
+          operation,
+          summary,
+          schemaId,
+          JsonUtil.getStringList(MANIFESTS, node).toArray(new String[0]));
     }
   }
 
-  public static Snapshot fromJson(FileIO io, String json) {
+  public static Snapshot fromJson(String json) {
     try {
-      return fromJson(io, JsonUtil.mapper().readValue(json, JsonNode.class));
+      return fromJson(JsonUtil.mapper().readValue(json, JsonNode.class));
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to read version from json: %s", json);
     }
   }
+
+  /**
+   * The main purpose of this class is to lazily retrieve the path from a v1 Snapshot that has
+   * manifest lists
+   */
+  private static class DummyFileIO implements FileIO {
+    @Override
+    public InputFile newInputFile(String path) {
+      return new InputFile() {
+        @Override
+        public long getLength() {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public SeekableInputStream newStream() {
+          throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String location() {
+          return path;
+        }
+
+        @Override
+        public boolean exists() {
+          return true;
+        }
+      };
+    }
+
+    @Override
+    public OutputFile newOutputFile(String path) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteFile(String path) {
+      throw new UnsupportedOperationException();
+    }
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 520c70bcef..a47cc65679 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -203,7 +203,6 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
       }
 
       return new BaseSnapshot(
-          ops.io(),
           sequenceNumber,
           snapshotId(),
           parentSnapshotId,
@@ -215,7 +214,6 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
 
     } else {
       return new BaseSnapshot(
-          ops.io(),
           snapshotId(),
           parentSnapshotId,
           System.currentTimeMillis(),
diff --git a/core/src/main/java/org/apache/iceberg/SortOrderParser.java b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
index 6d4f6efaa2..79ff32d19d 100644
--- a/core/src/main/java/org/apache/iceberg/SortOrderParser.java
+++ b/core/src/main/java/org/apache/iceberg/SortOrderParser.java
@@ -125,7 +125,7 @@ public class SortOrderParser {
         json.isObject(), "Cannot parse sort order from non-object: %s", json);
     int orderId = JsonUtil.getInt(ORDER_ID, json);
     UnboundSortOrder.Builder builder = UnboundSortOrder.builder().withOrderId(orderId);
-    buildFromJsonFields(builder, json.get(FIELDS));
+    buildFromJsonFields(builder, JsonUtil.get(FIELDS, json));
     return builder.build();
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
index 2abfba95c0..ee0e8f6502 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadataParser.java
@@ -263,7 +263,7 @@ public class TableMetadataParser {
     Codec codec = Codec.fromFileName(file.location());
     try (InputStream is =
         codec == Codec.GZIP ? new GZIPInputStream(file.newStream()) : file.newStream()) {
-      return fromJson(io, file, JsonUtil.mapper().readValue(is, JsonNode.class));
+      return fromJson(file, JsonUtil.mapper().readValue(is, JsonNode.class));
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to read file: %s", file);
     }
@@ -274,41 +274,39 @@ public class TableMetadataParser {
    *
    * <p>The TableMetadata's metadata file location will be unset.
    *
-   * @param io a FileIO used by {@link Snapshot} instances
    * @param json a JSON string of table metadata
    * @return a TableMetadata object
    */
-  public static TableMetadata fromJson(FileIO io, String json) {
-    return fromJson(io, null, json);
+  public static TableMetadata fromJson(String json) {
+    return fromJson(null, json);
   }
 
   /**
    * Read TableMetadata from a JSON string.
    *
-   * @param io a FileIO used by {@link Snapshot} instances
    * @param metadataLocation metadata location for the returned {@link TableMetadata}
    * @param json a JSON string of table metadata
    * @return a TableMetadata object
    */
-  public static TableMetadata fromJson(FileIO io, String metadataLocation, String json) {
+  public static TableMetadata fromJson(String metadataLocation, String json) {
     try {
       JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class);
-      return fromJson(io, metadataLocation, node);
+      return fromJson(metadataLocation, node);
     } catch (IOException e) {
       throw new UncheckedIOException("Failed to read JSON string: " + json, e);
     }
   }
 
-  static TableMetadata fromJson(FileIO io, InputFile file, JsonNode node) {
-    return fromJson(io, file.location(), node);
+  static TableMetadata fromJson(InputFile file, JsonNode node) {
+    return fromJson(file.location(), node);
   }
 
   public static TableMetadata fromJson(JsonNode node) {
-    return fromJson(null, (String) null, node);
+    return fromJson((String) null, node);
   }
 
   @SuppressWarnings({"checkstyle:CyclomaticComplexity", "checkstyle:MethodLength"})
-  static TableMetadata fromJson(FileIO io, String metadataLocation, JsonNode node) {
+  static TableMetadata fromJson(String metadataLocation, JsonNode node) {
     Preconditions.checkArgument(
         node.isObject(), "Cannot parse metadata from a non-object: %s", node);
 
@@ -362,7 +360,7 @@ public class TableMetadataParser {
       Preconditions.checkArgument(
           formatVersion == 1, "%s must exist in format v%s", SCHEMAS, formatVersion);
 
-      schema = SchemaParser.fromJson(node.get(SCHEMA));
+      schema = SchemaParser.fromJson(JsonUtil.get(SCHEMA, node));
       currentSchemaId = schema.schemaId();
       schemas = ImmutableList.of(schema);
     }
@@ -393,7 +391,7 @@ public class TableMetadataParser {
       specs =
           ImmutableList.of(
               PartitionSpecParser.fromJsonFields(
-                  schema, TableMetadata.INITIAL_SPEC_ID, node.get(PARTITION_SPEC)));
+                  schema, TableMetadata.INITIAL_SPEC_ID, JsonUtil.get(PARTITION_SPEC, node)));
     }
 
     Integer lastAssignedPartitionId = JsonUtil.getIntOrNull(LAST_PARTITION_ID, node);
@@ -443,14 +441,14 @@ public class TableMetadataParser {
       refs = ImmutableMap.of();
     }
 
-    JsonNode snapshotArray = node.get(SNAPSHOTS);
+    JsonNode snapshotArray = JsonUtil.get(SNAPSHOTS, node);
     Preconditions.checkArgument(
         snapshotArray.isArray(), "Cannot parse snapshots from non-array: %s", snapshotArray);
 
     List<Snapshot> snapshots = Lists.newArrayListWithExpectedSize(snapshotArray.size());
     Iterator<JsonNode> iterator = snapshotArray.elements();
     while (iterator.hasNext()) {
-      snapshots.add(SnapshotParser.fromJson(io, iterator.next()));
+      snapshots.add(SnapshotParser.fromJson(iterator.next()));
     }
 
     ImmutableList.Builder<HistoryEntry> entries = ImmutableList.builder();
@@ -508,7 +506,7 @@ public class TableMetadataParser {
     Iterator<String> refNames = refMap.fieldNames();
     while (refNames.hasNext()) {
       String refName = refNames.next();
-      JsonNode refNode = refMap.get(refName);
+      JsonNode refNode = JsonUtil.get(refName, refMap);
       Preconditions.checkArgument(
           refNode.isObject(), "Cannot parse ref %s from non-object: %s", refName, refMap);
       SnapshotRef ref = SnapshotRefParser.fromJson(refNode);
diff --git a/core/src/main/java/org/apache/iceberg/puffin/FileMetadataParser.java b/core/src/main/java/org/apache/iceberg/puffin/FileMetadataParser.java
index 96ccd15a0f..eb7b51f99a 100644
--- a/core/src/main/java/org/apache/iceberg/puffin/FileMetadataParser.java
+++ b/core/src/main/java/org/apache/iceberg/puffin/FileMetadataParser.java
@@ -95,11 +95,9 @@ public final class FileMetadataParser {
   static FileMetadata fileMetadataFromJson(JsonNode json) {
 
     ImmutableList.Builder<BlobMetadata> blobs = ImmutableList.builder();
-    JsonNode blobsJson = json.get(BLOBS);
+    JsonNode blobsJson = JsonUtil.get(BLOBS, json);
     Preconditions.checkArgument(
-        blobsJson != null && blobsJson.isArray(),
-        "Cannot parse blobs from non-array: %s",
-        blobsJson);
+        blobsJson.isArray(), "Cannot parse blobs from non-array: %s", blobsJson);
     for (JsonNode blobJson : blobsJson) {
       blobs.add(blobMetadataFromJson(blobJson));
     }
diff --git a/core/src/main/java/org/apache/iceberg/rest/responses/ErrorResponseParser.java b/core/src/main/java/org/apache/iceberg/rest/responses/ErrorResponseParser.java
index 5900e70bdf..a2973e05eb 100644
--- a/core/src/main/java/org/apache/iceberg/rest/responses/ErrorResponseParser.java
+++ b/core/src/main/java/org/apache/iceberg/rest/responses/ErrorResponseParser.java
@@ -96,10 +96,10 @@ public class ErrorResponseParser {
   public static ErrorResponse fromJson(JsonNode jsonNode) {
     Preconditions.checkArgument(
         jsonNode != null && jsonNode.isObject(),
-        "Cannot parse error respone from non-object value: %s",
+        "Cannot parse error response from non-object value: %s",
         jsonNode);
     Preconditions.checkArgument(jsonNode.has(ERROR), "Cannot parse missing field: error");
-    JsonNode error = jsonNode.get(ERROR);
+    JsonNode error = JsonUtil.get(ERROR, jsonNode);
     String message = JsonUtil.getStringOrNull(MESSAGE, error);
     String type = JsonUtil.getStringOrNull(TYPE, error);
     Integer code = JsonUtil.getIntOrNull(CODE, error);
diff --git a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
index 33a548d41d..5d39c0cea6 100644
--- a/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/JsonUtil.java
@@ -98,12 +98,18 @@ public class JsonUtil {
     }
   }
 
+  public static JsonNode get(String property, JsonNode node) {
+    Preconditions.checkArgument(
+        node.hasNonNull(property), "Cannot parse missing field: %s", property);
+    return node.get(property);
+  }
+
   public static int getInt(String property, JsonNode node) {
-    Preconditions.checkArgument(node.has(property), "Cannot parse missing int %s", property);
+    Preconditions.checkArgument(node.has(property), "Cannot parse missing int: %s", property);
     JsonNode pNode = node.get(property);
     Preconditions.checkArgument(
-        pNode != null && !pNode.isNull() && pNode.isNumber(),
-        "Cannot parse %s to an integer value: %s",
+        pNode != null && !pNode.isNull() && pNode.isIntegralNumber() && pNode.canConvertToInt(),
+        "Cannot parse to an integer value: %s: %s",
         property,
         pNode);
     return pNode.asInt();
@@ -113,56 +119,44 @@ public class JsonUtil {
     if (!node.hasNonNull(property)) {
       return null;
     }
-    JsonNode pNode = node.get(property);
-    Preconditions.checkArgument(
-        pNode != null && !pNode.isNull() && pNode.isIntegralNumber() && pNode.canConvertToInt(),
-        "Cannot parse %s to an integer value: %s",
-        property,
-        pNode);
-    return pNode.asInt();
+    return getInt(property, node);
   }
 
   public static Long getLongOrNull(String property, JsonNode node) {
     if (!node.hasNonNull(property)) {
       return null;
     }
-    JsonNode pNode = node.get(property);
-    Preconditions.checkArgument(
-        pNode != null && !pNode.isNull() && pNode.isIntegralNumber() && pNode.canConvertToLong(),
-        "Cannot parse %s to a long value: %s",
-        property,
-        pNode);
-    return pNode.asLong();
+    return getLong(property, node);
   }
 
   public static long getLong(String property, JsonNode node) {
-    Preconditions.checkArgument(node.has(property), "Cannot parse missing long %s", property);
+    Preconditions.checkArgument(node.has(property), "Cannot parse missing long: %s", property);
     JsonNode pNode = node.get(property);
     Preconditions.checkArgument(
-        pNode != null && !pNode.isNull() && pNode.isNumber(),
-        "Cannot parse %s to a long value: %s",
+        pNode != null && !pNode.isNull() && pNode.isIntegralNumber() && pNode.canConvertToLong(),
+        "Cannot parse to a long value: %s: %s",
         property,
         pNode);
     return pNode.asLong();
   }
 
   public static boolean getBool(String property, JsonNode node) {
-    Preconditions.checkArgument(node.has(property), "Cannot parse missing boolean %s", property);
+    Preconditions.checkArgument(node.has(property), "Cannot parse missing boolean: %s", property);
     JsonNode pNode = node.get(property);
     Preconditions.checkArgument(
         pNode != null && !pNode.isNull() && pNode.isBoolean(),
-        "Cannot parse %s to a boolean value: %s",
+        "Cannot parse to a boolean value: %s: %s",
         property,
         pNode);
     return pNode.asBoolean();
   }
 
   public static String getString(String property, JsonNode node) {
-    Preconditions.checkArgument(node.has(property), "Cannot parse missing string %s", property);
+    Preconditions.checkArgument(node.has(property), "Cannot parse missing string: %s", property);
     JsonNode pNode = node.get(property);
     Preconditions.checkArgument(
         pNode != null && !pNode.isNull() && pNode.isTextual(),
-        "Cannot parse %s to a string value: %s",
+        "Cannot parse to a string value: %s: %s",
         property,
         pNode);
     return pNode.asText();
@@ -176,20 +170,15 @@ public class JsonUtil {
     if (pNode != null && pNode.isNull()) {
       return null;
     }
-    Preconditions.checkArgument(
-        pNode != null && pNode.isTextual(),
-        "Cannot parse %s from non-string value: %s",
-        property,
-        pNode);
-    return pNode.asText();
+    return getString(property, node);
   }
 
   public static Map<String, String> getStringMap(String property, JsonNode node) {
-    Preconditions.checkArgument(node.has(property), "Cannot parse missing map %s", property);
+    Preconditions.checkArgument(node.has(property), "Cannot parse missing map: %s", property);
     JsonNode pNode = node.get(property);
     Preconditions.checkArgument(
         pNode != null && !pNode.isNull() && pNode.isObject(),
-        "Cannot parse %s from non-object value: %s",
+        "Cannot parse from non-object value: %s: %s",
         property,
         pNode);
 
@@ -216,14 +205,15 @@ public class JsonUtil {
   }
 
   public static List<String> getStringList(String property, JsonNode node) {
-    Preconditions.checkArgument(node.has(property), "Cannot parse missing list %s", property);
+    Preconditions.checkArgument(node.has(property), "Cannot parse missing list: %s", property);
     return ImmutableList.<String>builder()
         .addAll(new JsonStringArrayIterator(property, node))
         .build();
   }
 
   public static Set<String> getStringSet(String property, JsonNode node) {
-    Preconditions.checkArgument(node.hasNonNull(property), "Cannot parse missing set %s", property);
+    Preconditions.checkArgument(
+        node.hasNonNull(property), "Cannot parse missing set: %s", property);
 
     return ImmutableSet.<String>builder()
         .addAll(new JsonStringArrayIterator(property, node))
@@ -290,7 +280,7 @@ public class JsonUtil {
       JsonNode pNode = node.get(property);
       Preconditions.checkArgument(
           pNode != null && !pNode.isNull() && pNode.isArray(),
-          "Cannot parse %s from non-array value: %s",
+          "Cannot parse from non-array value: %s: %s",
           property,
           pNode);
       this.elements = pNode.elements();
@@ -364,7 +354,7 @@ public class JsonUtil {
     void validate(JsonNode element) {
       Preconditions.checkArgument(
           element.isIntegralNumber() && element.canConvertToLong(),
-          "Cannot parse long from  non-long value: %s",
+          "Cannot parse long from non-long value: %s",
           element);
     }
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
index f215b04a8a..946edf549e 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
@@ -358,7 +358,6 @@ public class TestMetadataUpdateParser {
 
     Snapshot snapshot =
         new BaseSnapshot(
-            null,
             snapshotId,
             parentId,
             System.currentTimeMillis(),
@@ -387,7 +386,6 @@ public class TestMetadataUpdateParser {
     Map<String, String> summary = ImmutableMap.of("files-added", "4", "files-deleted", "100");
     Snapshot snapshot =
         new BaseSnapshot(
-            null,
             snapshotId,
             parentId,
             System.currentTimeMillis(),
@@ -675,7 +673,7 @@ public class TestMetadataUpdateParser {
     AssertHelpers.assertThrows(
         "Parsing updates from SetProperties with a property set to null should throw",
         IllegalArgumentException.class,
-        "Cannot parse prop2 to a string value: null",
+        "Cannot parse to a string value: prop2: null",
         () -> MetadataUpdateParser.fromJson(json));
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
index 8170c7b6f6..c460336942 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
@@ -37,15 +37,22 @@ public class TestSnapshotJson {
 
   @Test
   public void testJsonConversion() {
+
     Snapshot expected =
         new BaseSnapshot(
-            ops.io(),
+            0,
+            23,
+            null,
             System.currentTimeMillis(),
+            null,
+            null,
             1,
-            "file:/tmp/manifest1.avro",
-            "file:/tmp/manifest2.avro");
+            new String[] {
+              localInput("file:/tmp/manifest1.avro").location(),
+              localInput("file:/tmp/manifest2.avro").location()
+            });
     String json = SnapshotParser.toJson(expected);
-    Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
+    Snapshot snapshot = SnapshotParser.fromJson(json);
 
     Assert.assertEquals("Snapshot ID should match", expected.snapshotId(), snapshot.snapshotId());
     Assert.assertEquals(
@@ -59,13 +66,19 @@ public class TestSnapshotJson {
   public void testJsonConversionWithoutSchemaId() {
     Snapshot expected =
         new BaseSnapshot(
-            ops.io(),
+            0,
+            23,
+            null,
             System.currentTimeMillis(),
             null,
-            "file:/tmp/manifest1.avro",
-            "file:/tmp/manifest2.avro");
+            null,
+            null,
+            new String[] {
+              localInput("file:/tmp/manifest1.avro").location(),
+              localInput("file:/tmp/manifest2.avro").location()
+            });
     String json = SnapshotParser.toJson(expected);
-    Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
+    Snapshot snapshot = SnapshotParser.fromJson(json);
 
     Assert.assertEquals("Snapshot ID should match", expected.snapshotId(), snapshot.snapshotId());
     Assert.assertEquals(
@@ -86,7 +99,6 @@ public class TestSnapshotJson {
 
     Snapshot expected =
         new BaseSnapshot(
-            ops.io(),
             id,
             parentId,
             System.currentTimeMillis(),
@@ -96,7 +108,7 @@ public class TestSnapshotJson {
             manifests);
 
     String json = SnapshotParser.toJson(expected);
-    Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
+    Snapshot snapshot = SnapshotParser.fromJson(json);
 
     Assert.assertEquals("Sequence number should default to 0 for v1", 0, snapshot.sequenceNumber());
     Assert.assertEquals("Snapshot ID should match", expected.snapshotId(), snapshot.snapshotId());
@@ -134,7 +146,6 @@ public class TestSnapshotJson {
 
     Snapshot expected =
         new BaseSnapshot(
-            ops.io(),
             id,
             34,
             parentId,
@@ -144,8 +155,7 @@ public class TestSnapshotJson {
             4,
             localInput(manifestList).location());
     Snapshot inMemory =
-        new BaseSnapshot(
-            ops.io(), id, parentId, expected.timestampMillis(), null, null, 4, manifests);
+        new BaseSnapshot(id, parentId, expected.timestampMillis(), null, null, 4, manifests);
 
     Assert.assertEquals(
         "Files should match in memory list",
@@ -153,7 +163,7 @@ public class TestSnapshotJson {
         expected.allManifests(ops.io()));
 
     String json = SnapshotParser.toJson(expected);
-    Snapshot snapshot = SnapshotParser.fromJson(ops.io(), json);
+    Snapshot snapshot = SnapshotParser.fromJson(json);
 
     Assert.assertEquals(
         "Sequence number should default to 0",
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotRefParser.java b/core/src/test/java/org/apache/iceberg/TestSnapshotRefParser.java
index bc13fd7d13..8216ea3437 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotRefParser.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotRefParser.java
@@ -144,7 +144,7 @@ public class TestSnapshotRefParser {
     AssertHelpers.assertThrows(
         "SnapshotRefParser should fail to deserialize ref with invalid snapshot id",
         IllegalArgumentException.class,
-        "Cannot parse snapshot-id to a long value",
+        "Cannot parse to a long value: snapshot-id: \"invalid-snapshot-id\"",
         () -> SnapshotRefParser.fromJson(invalidSnapshotId));
 
     String invalidTagType = "{\"snapshot-id\":1,\"type\":\"not-a-valid-tag-type\"}";
@@ -159,7 +159,7 @@ public class TestSnapshotRefParser {
     AssertHelpers.assertThrows(
         "SnapshotRefParser should fail to deserialize ref with invalid ref age",
         IllegalArgumentException.class,
-        "Cannot parse max-ref-age-ms to a long",
+        "Cannot parse to a long value: max-ref-age-ms: \"not-a-valid-value\"",
         () -> SnapshotRefParser.fromJson(invalidRefAge));
 
     String invalidSnapshotsToKeep =
@@ -168,7 +168,7 @@ public class TestSnapshotRefParser {
     AssertHelpers.assertThrows(
         "SnapshotRefParser should fail to deserialize ref with missing snapshot id",
         IllegalArgumentException.class,
-        "Cannot parse min-snapshots-to-keep to an integer value",
+        "Cannot parse to an integer value: min-snapshots-to-keep: \"invalid-number\"",
         () -> SnapshotRefParser.fromJson(invalidSnapshotsToKeep));
 
     String invalidMaxSnapshotAge =
@@ -176,7 +176,7 @@ public class TestSnapshotRefParser {
     AssertHelpers.assertThrows(
         "SnapshotRefParser should fail to deserialize ref with missing snapshot id",
         IllegalArgumentException.class,
-        "Cannot parse max-snapshot-age-ms to a long value",
+        "Cannot parse to a long value: max-snapshot-age-ms: \"invalid-age\"",
         () -> SnapshotRefParser.fromJson(invalidMaxSnapshotAge));
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index e3c69f8299..2d0f45d056 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -90,7 +90,6 @@ public class TestTableMetadata {
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot =
         new BaseSnapshot(
-            ops.io(),
             previousSnapshotId,
             null,
             previousSnapshotId,
@@ -102,7 +101,6 @@ public class TestTableMetadata {
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot =
         new BaseSnapshot(
-            ops.io(),
             currentSnapshotId,
             previousSnapshotId,
             currentSnapshotId,
@@ -155,7 +153,7 @@ public class TestTableMetadata {
             ImmutableList.of());
 
     String asJson = TableMetadataParser.toJson(expected);
-    TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), asJson);
+    TableMetadata metadata = TableMetadataParser.fromJson(asJson);
 
     Assert.assertEquals(
         "Format version should match", expected.formatVersion(), metadata.formatVersion());
@@ -228,7 +226,6 @@ public class TestTableMetadata {
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot =
         new BaseSnapshot(
-            ops.io(),
             previousSnapshotId,
             null,
             previousSnapshotId,
@@ -240,7 +237,6 @@ public class TestTableMetadata {
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot =
         new BaseSnapshot(
-            ops.io(),
             currentSnapshotId,
             previousSnapshotId,
             currentSnapshotId,
@@ -275,7 +271,7 @@ public class TestTableMetadata {
             ImmutableList.of());
 
     String asJson = toJsonWithoutSpecAndSchemaList(expected);
-    TableMetadata metadata = TableMetadataParser.fromJson(ops.io(), asJson);
+    TableMetadata metadata = TableMetadataParser.fromJson(asJson);
 
     Assert.assertEquals(
         "Format version should match", expected.formatVersion(), metadata.formatVersion());
@@ -352,7 +348,6 @@ public class TestTableMetadata {
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot =
         new BaseSnapshot(
-            ops.io(),
             previousSnapshotId,
             null,
             previousSnapshotId,
@@ -364,7 +359,6 @@ public class TestTableMetadata {
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot =
         new BaseSnapshot(
-            ops.io(),
             currentSnapshotId,
             previousSnapshotId,
             currentSnapshotId,
@@ -423,7 +417,6 @@ public class TestTableMetadata {
     long snapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot snapshot =
         new BaseSnapshot(
-            ops.io(),
             snapshotId,
             null,
             snapshotId,
@@ -556,7 +549,6 @@ public class TestTableMetadata {
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot =
         new BaseSnapshot(
-            ops.io(),
             previousSnapshotId,
             null,
             previousSnapshotId,
@@ -568,7 +560,6 @@ public class TestTableMetadata {
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot =
         new BaseSnapshot(
-            ops.io(),
             currentSnapshotId,
             previousSnapshotId,
             currentSnapshotId,
@@ -610,7 +601,7 @@ public class TestTableMetadata {
             ImmutableList.of());
 
     String asJson = TableMetadataParser.toJson(base);
-    TableMetadata metadataFromJson = TableMetadataParser.fromJson(ops.io(), asJson);
+    TableMetadata metadataFromJson = TableMetadataParser.fromJson(asJson);
 
     Assert.assertEquals(
         "Metadata logs should match", previousMetadataLog, metadataFromJson.previousFiles());
@@ -621,7 +612,6 @@ public class TestTableMetadata {
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot =
         new BaseSnapshot(
-            ops.io(),
             previousSnapshotId,
             null,
             previousSnapshotId,
@@ -633,7 +623,6 @@ public class TestTableMetadata {
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot =
         new BaseSnapshot(
-            ops.io(),
             currentSnapshotId,
             previousSnapshotId,
             currentSnapshotId,
@@ -706,7 +695,6 @@ public class TestTableMetadata {
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot =
         new BaseSnapshot(
-            ops.io(),
             previousSnapshotId,
             null,
             previousSnapshotId,
@@ -718,7 +706,6 @@ public class TestTableMetadata {
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot =
         new BaseSnapshot(
-            ops.io(),
             currentSnapshotId,
             previousSnapshotId,
             currentSnapshotId,
@@ -809,7 +796,6 @@ public class TestTableMetadata {
     long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
     Snapshot previousSnapshot =
         new BaseSnapshot(
-            ops.io(),
             previousSnapshotId,
             null,
             previousSnapshotId,
@@ -821,7 +807,6 @@ public class TestTableMetadata {
     long currentSnapshotId = System.currentTimeMillis();
     Snapshot currentSnapshot =
         new BaseSnapshot(
-            ops.io(),
             currentSnapshotId,
             previousSnapshotId,
             currentSnapshotId,
@@ -973,11 +958,11 @@ public class TestTableMetadata {
   @Test
   public void testParserVersionValidation() throws Exception {
     String supportedVersion1 = readTableMetadataInputFile("TableMetadataV1Valid.json");
-    TableMetadata parsed1 = TableMetadataParser.fromJson(ops.io(), supportedVersion1);
+    TableMetadata parsed1 = TableMetadataParser.fromJson(supportedVersion1);
     Assert.assertNotNull("Should successfully read supported metadata version", parsed1);
 
     String supportedVersion2 = readTableMetadataInputFile("TableMetadataV2Valid.json");
-    TableMetadata parsed2 = TableMetadataParser.fromJson(ops.io(), supportedVersion2);
+    TableMetadata parsed2 = TableMetadataParser.fromJson(supportedVersion2);
     Assert.assertNotNull("Should successfully read supported metadata version", parsed2);
 
     String unsupportedVersion = readTableMetadataInputFile("TableMetadataUnsupportedVersion.json");
@@ -985,7 +970,7 @@ public class TestTableMetadata {
         "Should not read unsupported metadata",
         IllegalArgumentException.class,
         "Cannot read unsupported version",
-        () -> TableMetadataParser.fromJson(ops.io(), unsupportedVersion));
+        () -> TableMetadataParser.fromJson(unsupportedVersion));
   }
 
   @Test
@@ -996,7 +981,7 @@ public class TestTableMetadata {
         "Should reject v2 metadata without partition specs",
         IllegalArgumentException.class,
         "partition-specs must exist in format v2",
-        () -> TableMetadataParser.fromJson(ops.io(), unsupportedVersion));
+        () -> TableMetadataParser.fromJson(unsupportedVersion));
   }
 
   @Test
@@ -1007,7 +992,7 @@ public class TestTableMetadata {
         "Should reject v2 metadata without last assigned partition field id",
         IllegalArgumentException.class,
         "last-partition-id must exist in format v2",
-        () -> TableMetadataParser.fromJson(ops.io(), unsupportedVersion));
+        () -> TableMetadataParser.fromJson(unsupportedVersion));
   }
 
   @Test
@@ -1017,7 +1002,7 @@ public class TestTableMetadata {
         "Should reject v2 metadata without sort order",
         IllegalArgumentException.class,
         "sort-orders must exist in format v2",
-        () -> TableMetadataParser.fromJson(ops.io(), unsupportedVersion));
+        () -> TableMetadataParser.fromJson(unsupportedVersion));
   }
 
   @Test
@@ -1027,7 +1012,7 @@ public class TestTableMetadata {
         "Should reject v2 metadata without valid schema id",
         IllegalArgumentException.class,
         "Cannot find schema with current-schema-id=2 from schemas",
-        () -> TableMetadataParser.fromJson(ops.io(), unsupported));
+        () -> TableMetadataParser.fromJson(unsupported));
   }
 
   @Test
@@ -1037,7 +1022,7 @@ public class TestTableMetadata {
         "Should reject v2 metadata without schemas",
         IllegalArgumentException.class,
         "schemas must exist in format v2",
-        () -> TableMetadataParser.fromJson(ops.io(), unsupported));
+        () -> TableMetadataParser.fromJson(unsupported));
   }
 
   private String readTableMetadataInputFile(String fileName) throws Exception {
@@ -1238,7 +1223,7 @@ public class TestTableMetadata {
   @Test
   public void testParseSchemaIdentifierFields() throws Exception {
     String data = readTableMetadataInputFile("TableMetadataV2Valid.json");
-    TableMetadata parsed = TableMetadataParser.fromJson(ops.io(), data);
+    TableMetadata parsed = TableMetadataParser.fromJson(data);
     Assert.assertEquals(Sets.newHashSet(), parsed.schemasById().get(0).identifierFieldIds());
     Assert.assertEquals(Sets.newHashSet(1, 2), parsed.schemasById().get(1).identifierFieldIds());
   }
diff --git a/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java b/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java
index 235ea1d1c4..ab6e9893e5 100644
--- a/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java
+++ b/core/src/test/java/org/apache/iceberg/catalog/TestTableIdentifierParser.java
@@ -84,7 +84,7 @@ public class TestTableIdentifierParser {
     AssertHelpers.assertThrows(
         "TableIdentifierParser should fail to deserialize an empty JSON string",
         IllegalArgumentException.class,
-        "Cannot parse missing string name",
+        "Cannot parse missing string: name",
         () -> TableIdentifierParser.fromJson(emptyJson));
 
     String emptyJsonArray = "[]";
@@ -101,7 +101,7 @@ public class TestTableIdentifierParser {
     AssertHelpers.assertThrows(
         "TableIdentifierParser should fail to deserialize table with missing name",
         IllegalArgumentException.class,
-        "Cannot parse missing string name",
+        "Cannot parse missing string: name",
         () -> TableIdentifierParser.fromJson(identifierMissingName));
   }
 
@@ -111,14 +111,14 @@ public class TestTableIdentifierParser {
     AssertHelpers.assertThrows(
         "TableIdentifierParser should fail to deserialize table with invalid namespace",
         IllegalArgumentException.class,
-        "Cannot parse namespace from non-array value: \"accounting.tax\"",
+        "Cannot parse from non-array value: namespace: \"accounting.tax\"",
         () -> TableIdentifierParser.fromJson(invalidNamespace));
 
     String invalidName = "{\"namespace\":[\"accounting\",\"tax\"],\"name\":1234}";
     AssertHelpers.assertThrows(
         "TableIdentifierParser should fail to deserialize table with invalid name",
         IllegalArgumentException.class,
-        "Cannot parse name to a string value: 1234",
+        "Cannot parse to a string value: name: 1234",
         () -> TableIdentifierParser.fromJson(invalidName));
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/puffin/TestFileMetadataParser.java b/core/src/test/java/org/apache/iceberg/puffin/TestFileMetadataParser.java
index c16b23237e..81192be627 100644
--- a/core/src/test/java/org/apache/iceberg/puffin/TestFileMetadataParser.java
+++ b/core/src/test/java/org/apache/iceberg/puffin/TestFileMetadataParser.java
@@ -81,7 +81,7 @@ public class TestFileMetadataParser {
   public void testMissingBlobs() {
     assertThatThrownBy(() -> FileMetadataParser.fromJson("{\"properties\": {}}"))
         .isInstanceOf(IllegalArgumentException.class)
-        .hasMessage("Cannot parse blobs from non-array: null");
+        .hasMessage("Cannot parse missing field: blobs");
   }
 
   @Test
diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestRenameTableRequest.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestRenameTableRequest.java
index 13c0a77edc..e4060a17f8 100644
--- a/core/src/test/java/org/apache/iceberg/rest/requests/TestRenameTableRequest.java
+++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestRenameTableRequest.java
@@ -55,7 +55,7 @@ public class TestRenameTableRequest extends RequestResponseTestBase<RenameTableR
     AssertHelpers.assertThrows(
         "A JSON request with an invalid source table identifier, with null for the name, should fail to deserialize",
         JsonProcessingException.class,
-        "Cannot parse name to a string value: null",
+        "Cannot parse to a string value: name: null",
         () -> deserialize(jsonSourceNullName));
 
     String jsonDestinationNullName =
@@ -64,7 +64,7 @@ public class TestRenameTableRequest extends RequestResponseTestBase<RenameTableR
     AssertHelpers.assertThrows(
         "A JSON request with an invalid destination table, with null for the name, should fail to deserialize",
         JsonProcessingException.class,
-        "Cannot parse name to a string value: null",
+        "Cannot parse to a string value: name: null",
         () -> deserialize(jsonDestinationNullName));
 
     String jsonSourceMissingName =
@@ -73,7 +73,7 @@ public class TestRenameTableRequest extends RequestResponseTestBase<RenameTableR
     AssertHelpers.assertThrows(
         "A JSON request with an invalid source table identifier, with no name, should fail to deserialize",
         JsonProcessingException.class,
-        "Cannot parse missing string name",
+        "Cannot parse missing string: name",
         () -> deserialize(jsonSourceMissingName));
 
     String jsonDestinationMissingName =
@@ -82,7 +82,7 @@ public class TestRenameTableRequest extends RequestResponseTestBase<RenameTableR
     AssertHelpers.assertThrows(
         "A JSON request with an invalid destination table identifier, with no name, should fail to deserialize",
         JsonProcessingException.class,
-        "Cannot parse missing string name",
+        "Cannot parse missing string: name",
         () -> deserialize(jsonDestinationMissingName));
 
     String emptyJson = "{}";
diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java
index d5fa82a730..f421c1615b 100644
--- a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java
+++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java
@@ -109,7 +109,7 @@ public class TestLoadTableResponse extends RequestResponseTestBase<LoadTableResp
   public void testRoundTripSerdeWithV1TableMetadata() throws Exception {
     String tableMetadataJson = readTableMetadataInputFile("TableMetadataV1Valid.json");
     TableMetadata v1Metadata =
-        TableMetadataParser.fromJson(null, TEST_METADATA_LOCATION, tableMetadataJson);
+        TableMetadataParser.fromJson(TEST_METADATA_LOCATION, tableMetadataJson);
     // Convert the TableMetadata JSON from the file to an object and then back to JSON so that
     // missing fields
     // are filled in with their default values.
@@ -126,7 +126,7 @@ public class TestLoadTableResponse extends RequestResponseTestBase<LoadTableResp
   public void testRoundTripSerdeWithV2TableMetadata() throws Exception {
     String tableMetadataJson = readTableMetadataInputFile("TableMetadataV2Valid.json");
     TableMetadata v2Metadata =
-        TableMetadataParser.fromJson(null, TEST_METADATA_LOCATION, tableMetadataJson);
+        TableMetadataParser.fromJson(TEST_METADATA_LOCATION, tableMetadataJson);
     // Convert the TableMetadata JSON from the file to an object and then back to JSON so that
     // missing fields
     // are filled in with their default values.
@@ -146,8 +146,7 @@ public class TestLoadTableResponse extends RequestResponseTestBase<LoadTableResp
     String json =
         String.format(
             "{\"metadata-location\":\"%s\",\"metadata\":%s}", TEST_METADATA_LOCATION, metadataJson);
-    TableMetadata metadata =
-        TableMetadataParser.fromJson(null, TEST_METADATA_LOCATION, metadataJson);
+    TableMetadata metadata = TableMetadataParser.fromJson(TEST_METADATA_LOCATION, metadataJson);
     LoadTableResponse actual = deserialize(json);
     LoadTableResponse expected = LoadTableResponse.builder().withTableMetadata(metadata).build();
     assertEquals(actual, expected);
diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestOAuthTokenResponse.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestOAuthTokenResponse.java
index 6377b40489..ac52cd24ce 100644
--- a/core/src/test/java/org/apache/iceberg/rest/responses/TestOAuthTokenResponse.java
+++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestOAuthTokenResponse.java
@@ -114,25 +114,25 @@ public class TestOAuthTokenResponse extends RequestResponseTestBase<OAuthTokenRe
     AssertHelpers.assertThrows(
         "Token should be required",
         IllegalArgumentException.class,
-        "missing string access_token",
+        "missing string: access_token",
         () -> deserialize("{\"token_type\":\"bearer\"}"));
 
     AssertHelpers.assertThrows(
         "Token should be string",
         IllegalArgumentException.class,
-        "Cannot parse access_token to a string value: 34",
+        "Cannot parse to a string value: access_token: 34",
         () -> deserialize("{\"access_token\":34,\"token_type\":\"bearer\"}"));
 
     AssertHelpers.assertThrows(
         "Token type should be required",
         IllegalArgumentException.class,
-        "missing string token_type",
+        "missing string: token_type",
         () -> deserialize("{\"access_token\":\"bearer-token\"}"));
 
     AssertHelpers.assertThrows(
         "Token type should be string",
         IllegalArgumentException.class,
-        "Cannot parse token_type to a string value: 34",
+        "Cannot parse to a string value: token_type: 34",
         () -> deserialize("{\"access_token\":\"bearer-token\",\"token_type\":34}"));
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java
new file mode 100644
index 0000000000..6fc6c6a79b
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/util/TestJsonUtil.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+public class TestJsonUtil {
+
+  @Test
+  public void get() throws JsonProcessingException {
+    Assertions.assertThatThrownBy(() -> JsonUtil.get("x", JsonUtil.mapper().readTree("{}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing field: x");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.get("x", JsonUtil.mapper().readTree("{\"x\": null}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing field: x");
+
+    Assertions.assertThat(JsonUtil.get("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}")).asText())
+        .isEqualTo("23");
+  }
+
+  @Test
+  public void getInt() throws JsonProcessingException {
+    Assertions.assertThatThrownBy(() -> JsonUtil.getInt("x", JsonUtil.mapper().readTree("{}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing int: x");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getInt("x", JsonUtil.mapper().readTree("{\"x\": null}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to an integer value: x: null");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getInt("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to an integer value: x: \"23\"");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getInt("x", JsonUtil.mapper().readTree("{\"x\": 23.0}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to an integer value: x: 23.0");
+
+    Assertions.assertThat(JsonUtil.getInt("x", JsonUtil.mapper().readTree("{\"x\": 23}")))
+        .isEqualTo(23);
+  }
+
+  @Test
+  public void getIntOrNull() throws JsonProcessingException {
+    Assertions.assertThat(JsonUtil.getIntOrNull("x", JsonUtil.mapper().readTree("{}"))).isNull();
+    Assertions.assertThat(JsonUtil.getIntOrNull("x", JsonUtil.mapper().readTree("{\"x\": 23}")))
+        .isEqualTo(23);
+    Assertions.assertThat(JsonUtil.getIntOrNull("x", JsonUtil.mapper().readTree("{\"x\": null}")))
+        .isNull();
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getIntOrNull("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to an integer value: x: \"23\"");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getIntOrNull("x", JsonUtil.mapper().readTree("{\"x\": 23.0}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to an integer value: x: 23.0");
+  }
+
+  @Test
+  public void getLong() throws JsonProcessingException {
+    Assertions.assertThatThrownBy(() -> JsonUtil.getLong("x", JsonUtil.mapper().readTree("{}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing long: x");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getLong("x", JsonUtil.mapper().readTree("{\"x\": null}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a long value: x: null");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getLong("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a long value: x: \"23\"");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getLong("x", JsonUtil.mapper().readTree("{\"x\": 23.0}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a long value: x: 23.0");
+
+    Assertions.assertThat(JsonUtil.getLong("x", JsonUtil.mapper().readTree("{\"x\": 23}")))
+        .isEqualTo(23);
+  }
+
+  @Test
+  public void getLongOrNull() throws JsonProcessingException {
+    Assertions.assertThat(JsonUtil.getLongOrNull("x", JsonUtil.mapper().readTree("{}"))).isNull();
+    Assertions.assertThat(JsonUtil.getLongOrNull("x", JsonUtil.mapper().readTree("{\"x\": 23}")))
+        .isEqualTo(23);
+    Assertions.assertThat(JsonUtil.getLongOrNull("x", JsonUtil.mapper().readTree("{\"x\": null}")))
+        .isNull();
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getLongOrNull("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a long value: x: \"23\"");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getLongOrNull("x", JsonUtil.mapper().readTree("{\"x\": 23.0}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a long value: x: 23.0");
+  }
+
+  @Test
+  public void getString() throws JsonProcessingException {
+    Assertions.assertThatThrownBy(() -> JsonUtil.getString("x", JsonUtil.mapper().readTree("{}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing string: x");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getString("x", JsonUtil.mapper().readTree("{\"x\": null}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a string value: x: null");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getString("x", JsonUtil.mapper().readTree("{\"x\": 23}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a string value: x: 23");
+
+    Assertions.assertThat(JsonUtil.getString("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}")))
+        .isEqualTo("23");
+  }
+
+  @Test
+  public void getStringOrNull() throws JsonProcessingException {
+    Assertions.assertThat(JsonUtil.getStringOrNull("x", JsonUtil.mapper().readTree("{}"))).isNull();
+    Assertions.assertThat(
+            JsonUtil.getStringOrNull("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}")))
+        .isEqualTo("23");
+    Assertions.assertThat(
+            JsonUtil.getStringOrNull("x", JsonUtil.mapper().readTree("{\"x\": null}")))
+        .isNull();
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getStringOrNull("x", JsonUtil.mapper().readTree("{\"x\": 23}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a string value: x: 23");
+  }
+
+  @Test
+  public void getBool() throws JsonProcessingException {
+    Assertions.assertThatThrownBy(() -> JsonUtil.getBool("x", JsonUtil.mapper().readTree("{}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse missing boolean: x");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getBool("x", JsonUtil.mapper().readTree("{\"x\": null}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a boolean value: x: null");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getBool("x", JsonUtil.mapper().readTree("{\"x\": \"23\"}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a boolean value: x: \"23\"");
+
+    Assertions.assertThatThrownBy(
+            () -> JsonUtil.getBool("x", JsonUtil.mapper().readTree("{\"x\": \"true\"}")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot parse to a boolean value: x: \"true\"");
+
+    Assertions.assertThat(JsonUtil.getBool("x", JsonUtil.mapper().readTree("{\"x\": true}")))
+        .isTrue();
+    Assertions.assertThat(JsonUtil.getBool("x", JsonUtil.mapper().readTree("{\"x\": false}")))
+        .isFalse();
+  }
+}
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
index 46f6c54f2f..8741151f2f 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
@@ -113,7 +113,7 @@ public final class NessieUtil {
       } catch (IOException e) {
         throw new RuntimeException("Failed to generate JSON string from metadata", e);
       }
-      deserialized = TableMetadataParser.fromJson(io, metadataLocation, jsonString);
+      deserialized = TableMetadataParser.fromJson(metadataLocation, jsonString);
     } else {
       deserialized = TableMetadataParser.read(io, metadataLocation);
     }
diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index b0d8fcf748..c1b9537c5a 100644
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -274,7 +274,7 @@ public class BaseRewriteManifestsSparkAction
       return ImmutableList.of();
     }
 
-    return currentSnapshot.dataManifests().stream()
+    return currentSnapshot.dataManifests(table.io()).stream()
         .filter(manifest -> manifest.partitionSpecId() == spec.specId() && predicate.test(manifest))
         .collect(Collectors.toList());
   }
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
index d6be87caf2..82226a716c 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
@@ -129,7 +129,7 @@ public class SnapshotFunctionalityTest {
   @Test
   public void getInfoAboutFilesAddedFromSnapshot() {
     Snapshot snapshot = table.currentSnapshot();
-    Iterable<DataFile> addedFiles = snapshot.addedFiles();
+    Iterable<DataFile> addedFiles = snapshot.addedDataFiles(table.io());
 
     for (DataFile dataFile : addedFiles) {
       log.info("File path: " + dataFile.path());
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index 84ac1e2bb8..7a50ee2f5d 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -592,7 +592,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     // Files should be deleted of dangling staged snapshot
     snapshotB
-        .addedFiles()
+        .addedDataFiles(table.io())
         .forEach(
             i -> {
               expectedDeletes.add(i.path().toString());
@@ -601,7 +601,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     // ManifestList should be deleted too
     expectedDeletes.add(snapshotB.manifestListLocation());
     snapshotB
-        .dataManifests()
+        .dataManifests(table.io())
         .forEach(
             file -> {
               // Only the manifest of B should be deleted.
@@ -661,7 +661,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     Lists.newArrayList(snapshotB, snapshotC, snapshotD)
         .forEach(
             i -> {
-              i.addedFiles()
+              i.addedDataFiles(table.io())
                   .forEach(
                       item -> {
                         Assert.assertFalse(deletedFiles.contains(item.path().toString()));
@@ -710,7 +710,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     Lists.newArrayList(snapshotB)
         .forEach(
             i -> {
-              i.addedFiles()
+              i.addedDataFiles(table.io())
                   .forEach(
                       item -> {
                         Assert.assertFalse(deletedFiles.contains(item.path().toString()));
@@ -730,7 +730,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     Lists.newArrayList(snapshotB, snapshotD)
         .forEach(
             i -> {
-              i.addedFiles()
+              i.addedDataFiles(table.io())
                   .forEach(
                       item -> {
                         Assert.assertFalse(deletedFiles.contains(item.path().toString()));
@@ -781,7 +781,8 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     table.newAppend().appendFile(FILE_A).commit();
 
     Snapshot firstSnapshot = table.currentSnapshot();
-    Assert.assertEquals("Should create one manifest", 1, firstSnapshot.allManifests().size());
+    Assert.assertEquals(
+        "Should create one manifest", 1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
@@ -791,7 +792,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     Assert.assertEquals(
         "Should create replace manifest with a rewritten manifest",
         1,
-        secondSnapshot.allManifests().size());
+        secondSnapshot.allManifests(table.io()).size());
 
     table.newAppend().appendFile(FILE_B).commit();
 
@@ -824,10 +825,13 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
         "Should remove expired manifest lists and deleted data file",
         Sets.newHashSet(
             firstSnapshot.manifestListLocation(), // snapshot expired
-            firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete
+            firstSnapshot
+                .allManifests(table.io())
+                .get(0)
+                .path(), // manifest was rewritten for delete
             secondSnapshot.manifestListLocation(), // snapshot expired
             secondSnapshot
-                .allManifests()
+                .allManifests(table.io())
                 .get(0)
                 .path(), // manifest contained only deletes, was dropped
             FILE_A.path()), // deleted
@@ -844,7 +848,8 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
 
     Snapshot firstSnapshot = table.currentSnapshot();
-    Assert.assertEquals("Should create one manifest", 1, firstSnapshot.allManifests().size());
+    Assert.assertEquals(
+        "Should create one manifest", 1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
@@ -857,7 +862,7 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     Assert.assertEquals(
         "Should replace manifest with a rewritten manifest",
         1,
-        secondSnapshot.allManifests().size());
+        secondSnapshot.allManifests(table.io()).size());
 
     table
         .newFastAppend() // do not merge to keep the last snapshot's manifest valid
@@ -893,7 +898,10 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
         "Should remove expired manifest lists and deleted data file",
         Sets.newHashSet(
             firstSnapshot.manifestListLocation(), // snapshot expired
-            firstSnapshot.allManifests().get(0).path(), // manifest was rewritten for delete
+            firstSnapshot
+                .allManifests(table.io())
+                .get(0)
+                .path(), // manifest was rewritten for delete
             secondSnapshot.manifestListLocation(), // snapshot expired
             FILE_A.path()), // deleted
         deletedFiles);
@@ -909,15 +917,17 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     table.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
 
     Snapshot firstSnapshot = table.currentSnapshot();
-    Assert.assertEquals("Should create one manifest", 1, firstSnapshot.allManifests().size());
+    Assert.assertEquals(
+        "Should create one manifest", 1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
     table.newDelete().deleteFile(FILE_B).commit();
 
     Snapshot secondSnapshot = table.currentSnapshot();
-    Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests());
-    secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
+    Set<ManifestFile> secondSnapshotManifests =
+        Sets.newHashSet(secondSnapshot.allManifests(table.io()));
+    secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
     Assert.assertEquals(
         "Should add one new manifest for append", 1, secondSnapshotManifests.size());
 
@@ -962,15 +972,17 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
     table.newAppend().appendFile(FILE_A).commit();
 
     Snapshot firstSnapshot = table.currentSnapshot();
-    Assert.assertEquals("Should create one manifest", 1, firstSnapshot.allManifests().size());
+    Assert.assertEquals(
+        "Should create one manifest", 1, firstSnapshot.allManifests(table.io()).size());
 
     rightAfterSnapshot();
 
     table.newAppend().appendFile(FILE_B).commit();
 
     Snapshot secondSnapshot = table.currentSnapshot();
-    Set<ManifestFile> secondSnapshotManifests = Sets.newHashSet(secondSnapshot.allManifests());
-    secondSnapshotManifests.removeAll(firstSnapshot.allManifests());
+    Set<ManifestFile> secondSnapshotManifests =
+        Sets.newHashSet(secondSnapshot.allManifests(table.io()));
+    secondSnapshotManifests.removeAll(firstSnapshot.allManifests(table.io()));
     Assert.assertEquals(
         "Should add one new manifest for append", 1, secondSnapshotManifests.size());
 
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index b98c717a4a..4b50ea0c29 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -127,7 +127,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -142,7 +142,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
 
     Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -180,7 +180,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -208,7 +208,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
     table.refresh();
 
     // table should reflect the changes, since the commit was successful
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
 
     Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -258,7 +258,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 4 manifests before rewrite", 4, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -282,7 +282,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
 
     Assert.assertEquals(4, (long) newManifests.get(0).existingFilesCount());
@@ -347,7 +347,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
       Assert.assertEquals(
           "Action should rewrite all manifests",
-          snapshot.allManifests(),
+          snapshot.allManifests(table.io()),
           result.rewrittenManifests());
       Assert.assertEquals(
           "Action should add 1 manifest", 1, Iterables.size(result.addedManifests()));
@@ -375,7 +375,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifests before rewrite", 1, manifests.size());
 
     // set the target manifest size to a small value to force splitting records into multiple files
@@ -402,7 +402,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
 
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
@@ -432,7 +432,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
 
     SparkActions actions = SparkActions.get();
@@ -453,7 +453,7 @@ public class TestRewriteManifestsAction extends SparkTestBase {
 
     table.refresh();
 
-    List<ManifestFile> newManifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 2 manifests after rewrite", 2, newManifests.size());
 
     Assert.assertFalse("First manifest must be rewritten", newManifests.contains(manifests.get(0)));
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
index af02cc8d46..9f32769379 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
@@ -190,7 +190,7 @@ public class TestDataFrameWrites extends AvroDataTest {
 
     table
         .currentSnapshot()
-        .addedFiles()
+        .addedDataFiles(table.io())
         .forEach(
             dataFile ->
                 Assert.assertTrue(
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 0e48797480..4ae489edff 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -198,7 +198,8 @@ public class TestDataSourceOptions {
         .mode("append")
         .save(tableLocation);
 
-    List<DataFile> files = Lists.newArrayList(icebergTable.currentSnapshot().addedFiles());
+    List<DataFile> files =
+        Lists.newArrayList(icebergTable.currentSnapshot().addedDataFiles(icebergTable.io()));
     Assert.assertEquals("Should have written 1 file", 1, files.size());
 
     long fileSize = files.get(0).fileSizeInBytes();
@@ -325,7 +326,7 @@ public class TestDataSourceOptions {
     // produce 2nd manifest
     originalDf.select("id", "data").write().format("iceberg").mode("append").save(tableLocation);
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
 
     Assert.assertEquals("Must be 2 manifests", 2, manifests.size());
 
@@ -355,7 +356,7 @@ public class TestDataSourceOptions {
     HadoopTables tables = new HadoopTables(CONF);
     PartitionSpec spec = PartitionSpec.unpartitioned();
     Map<String, String> options = Maps.newHashMap();
-    tables.create(SCHEMA, spec, options, tableLocation);
+    Table table = tables.create(SCHEMA, spec, options, tableLocation);
 
     List<SimpleRecord> expectedRecords =
         Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "b"));
@@ -369,7 +370,7 @@ public class TestDataSourceOptions {
                     tables
                         .load(tableLocation + "#entries")
                         .currentSnapshot()
-                        .allManifests()
+                        .allManifests(table.io())
                         .get(0)
                         .length()
                 + splitSize
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 5f3e84f612..7fc589056d 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -160,9 +160,10 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     Snapshot snapshot = table.currentSnapshot();
 
-    Assert.assertEquals("Should only contain one manifest", 1, snapshot.allManifests().size());
+    Assert.assertEquals(
+        "Should only contain one manifest", 1, snapshot.allManifests(table.io()).size());
 
-    InputFile manifest = table.io().newInputFile(snapshot.allManifests().get(0).path());
+    InputFile manifest = table.io().newInputFile(snapshot.allManifests(table.io()).get(0).path());
     List<GenericData.Record> expected = Lists.newArrayList();
     try (CloseableIterable<GenericData.Record> rows =
         Avro.read(manifest).project(entriesTable.schema()).build()) {
@@ -226,7 +227,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
 
     List<Object[]> singleActual =
         rowsToJava(
@@ -259,7 +260,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
 
     List<Object[]> multiActual =
         rowsToJava(
@@ -297,7 +298,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+    DataFile file = table.currentSnapshot().addedDataFiles(table.io()).iterator().next();
 
     List<Object[]> multiActual =
         rowsToJava(
@@ -355,7 +356,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest :
-        Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::allManifests))) {
+        Iterables.concat(
+            Iterables.transform(
+                table.snapshots(), snapshot -> snapshot.allManifests(table.io())))) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows =
           Avro.read(in).project(entriesTable.schema()).build()) {
@@ -442,7 +445,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files")).collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows =
           Avro.read(in).project(entriesTable.schema()).build()) {
@@ -499,7 +502,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
               .collectAsList();
 
       List<GenericData.Record> expected = Lists.newArrayList();
-      for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+      for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
         InputFile in = table.io().newInputFile(manifest.path());
         try (CloseableIterable<GenericData.Record> rows =
             Avro.read(in).project(entriesTable.schema()).build()) {
@@ -591,7 +594,8 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         .save(loadLocation(tableIdentifier));
 
     table.refresh();
-    DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
+    DataFile toDelete =
+        Iterables.getOnlyElement(table.currentSnapshot().addedDataFiles(table.io()));
 
     // add a second file
     df2.select("id", "data")
@@ -607,7 +611,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
         spark.read().format("iceberg").load(loadLocation(tableIdentifier, "files")).collectAsList();
 
     List<GenericData.Record> expected = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().dataManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().dataManifests(table.io())) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows =
           Avro.read(in).project(entriesTable.schema()).build()) {
@@ -723,7 +727,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     List<GenericData.Record> expected = Lists.newArrayList();
     for (ManifestFile manifest :
-        Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::dataManifests))) {
+        Iterables.concat(
+            Iterables.transform(
+                table.snapshots(), snapshot -> snapshot.dataManifests(table.io())))) {
       InputFile in = table.io().newInputFile(manifest.path());
       try (CloseableIterable<GenericData.Record> rows =
           Avro.read(in).project(entriesTable.schema()).build()) {
@@ -1020,7 +1026,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
                 "partition_summary"));
     List<GenericData.Record> expected =
         Lists.transform(
-            table.currentSnapshot().allManifests(),
+            table.currentSnapshot().allManifests(table.io()),
             manifest ->
                 builder
                     .set("content", manifest.content().id())
@@ -1122,7 +1128,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
                 "partition_summary"));
     List<GenericData.Record> expected =
         Lists.transform(
-            table.currentSnapshot().allManifests(),
+            table.currentSnapshot().allManifests(table.io()),
             manifest ->
                 builder
                     .set("partition_spec_id", manifest.partitionSpecId())
@@ -1636,7 +1642,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     table.refresh();
     Snapshot snapshot1 = table.currentSnapshot();
     snapshotIdToManifests.addAll(
-        snapshot1.allManifests().stream()
+        snapshot1.allManifests(table.io()).stream()
             .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest))
             .collect(Collectors.toList()));
 
@@ -1648,9 +1654,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     table.refresh();
     Snapshot snapshot2 = table.currentSnapshot();
-    Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests().size());
+    Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests(table.io()).size());
     snapshotIdToManifests.addAll(
-        snapshot2.allManifests().stream()
+        snapshot2.allManifests(table.io()).stream()
             .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest))
             .collect(Collectors.toList()));
 
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
index 80876ceb25..b1f2082b5d 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java
@@ -159,7 +159,7 @@ public class TestSparkDataFile {
 
     table.refresh();
 
-    List<ManifestFile> manifests = table.currentSnapshot().allManifests();
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifest", 1, manifests.size());
 
     List<DataFile> dataFiles = Lists.newArrayList();
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index ed265c1689..cfb2bc331f 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -128,7 +128,7 @@ public class TestSparkDataWrite {
         result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
     Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
     Assert.assertEquals("Result rows should match", expected, actual);
-    for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
       for (DataFile file : ManifestFiles.read(manifest, table.io())) {
         // TODO: avro not support split
         if (!format.equals(FileFormat.AVRO)) {
@@ -367,7 +367,7 @@ public class TestSparkDataWrite {
     Assert.assertEquals("Result rows should match", expected, actual);
 
     List<DataFile> files = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
       for (DataFile file : ManifestFiles.read(manifest, table.io())) {
         files.add(file);
       }
@@ -582,7 +582,7 @@ public class TestSparkDataWrite {
     Assert.assertEquals("Result rows should match", expected, actual);
 
     List<DataFile> files = Lists.newArrayList();
-    for (ManifestFile manifest : table.currentSnapshot().allManifests()) {
+    for (ManifestFile manifest : table.currentSnapshot().allManifests(table.io())) {
       for (DataFile file : ManifestFiles.read(manifest, table.io())) {
         files.add(file);
       }
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index d97fa3d27a..dc8017838f 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1680,7 +1680,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     table.refresh();
     Snapshot snapshot1 = table.currentSnapshot();
     snapshotIdToManifests.addAll(
-        snapshot1.allManifests().stream()
+        snapshot1.allManifests(table.io()).stream()
             .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest))
             .collect(Collectors.toList()));
 
@@ -1692,9 +1692,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     table.refresh();
     Snapshot snapshot2 = table.currentSnapshot();
-    Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests().size());
+    Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests(table.io()).size());
     snapshotIdToManifests.addAll(
-        snapshot2.allManifests().stream()
+        snapshot2.allManifests(table.io()).stream()
             .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest))
             .collect(Collectors.toList()));
 
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index d97fa3d27a..dc8017838f 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1680,7 +1680,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     table.refresh();
     Snapshot snapshot1 = table.currentSnapshot();
     snapshotIdToManifests.addAll(
-        snapshot1.allManifests().stream()
+        snapshot1.allManifests(table.io()).stream()
             .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest))
             .collect(Collectors.toList()));
 
@@ -1692,9 +1692,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     table.refresh();
     Snapshot snapshot2 = table.currentSnapshot();
-    Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests().size());
+    Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests(table.io()).size());
     snapshotIdToManifests.addAll(
-        snapshot2.allManifests().stream()
+        snapshot2.allManifests(table.io()).stream()
             .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest))
             .collect(Collectors.toList()));
 
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 498f819804..d3c299aa8b 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -122,7 +122,9 @@ public class SparkMicroBatchStream implements MicroBatchStream {
     // iterator to find
     // addedFilesCount.
     addedFilesCount =
-        addedFilesCount == -1 ? Iterables.size(latestSnapshot.addedFiles()) : addedFilesCount;
+        addedFilesCount == -1
+            ? Iterables.size(latestSnapshot.addedDataFiles(table.io()))
+            : addedFilesCount;
 
     return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount, false);
   }
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 5f01f89ebe..141ecd1e8c 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1697,7 +1697,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     table.refresh();
     Snapshot snapshot1 = table.currentSnapshot();
     snapshotIdToManifests.addAll(
-        snapshot1.allManifests().stream()
+        snapshot1.allManifests(table.io()).stream()
             .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest))
             .collect(Collectors.toList()));
 
@@ -1709,9 +1709,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     table.refresh();
     Snapshot snapshot2 = table.currentSnapshot();
-    Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests().size());
+    Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests(table.io()).size());
     snapshotIdToManifests.addAll(
-        snapshot2.allManifests().stream()
+        snapshot2.allManifests(table.io()).stream()
             .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest))
             .collect(Collectors.toList()));
 
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 498f819804..d3c299aa8b 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -122,7 +122,9 @@ public class SparkMicroBatchStream implements MicroBatchStream {
     // iterator to find
     // addedFilesCount.
     addedFilesCount =
-        addedFilesCount == -1 ? Iterables.size(latestSnapshot.addedFiles()) : addedFilesCount;
+        addedFilesCount == -1
+            ? Iterables.size(latestSnapshot.addedDataFiles(table.io()))
+            : addedFilesCount;
 
     return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount, false);
   }
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 5f01f89ebe..141ecd1e8c 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -1697,7 +1697,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     table.refresh();
     Snapshot snapshot1 = table.currentSnapshot();
     snapshotIdToManifests.addAll(
-        snapshot1.allManifests().stream()
+        snapshot1.allManifests(table.io()).stream()
             .map(manifest -> Pair.of(snapshot1.snapshotId(), manifest))
             .collect(Collectors.toList()));
 
@@ -1709,9 +1709,9 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
 
     table.refresh();
     Snapshot snapshot2 = table.currentSnapshot();
-    Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests().size());
+    Assert.assertEquals("Should have two manifests", 2, snapshot2.allManifests(table.io()).size());
     snapshotIdToManifests.addAll(
-        snapshot2.allManifests().stream()
+        snapshot2.allManifests(table.io()).stream()
             .map(manifest -> Pair.of(snapshot2.snapshotId(), manifest))
             .collect(Collectors.toList()));