You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2023/01/14 12:01:54 UTC

[iceberg] branch master updated: Python: Refactor loading manifests (#6525)

This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 24211aa274 Python: Refactor loading manifests (#6525)
24211aa274 is described below

commit 24211aa2741f29eda14f21781138119a461ce289
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Sat Jan 14 13:01:48 2023 +0100

    Python: Refactor loading manifests (#6525)
    
    * Add ManifestFile StructProtocol implementation.
    
    * Python: Make it work
    
    * Python: Refactor the reading of the manifest
    
    * Comments
    
    * Fix named record
    
    * Revert some stuff
    
    * Extend the test
    
    * Always pass in the struct
    
    * Refactor the Records into a single one
    
    - And remove Pydantic as a base class
    
    * Comments
    
    * Cleanup
    
    * Revert some changes and cleanup
    
    * Make CI happy
    
    * Fix passing in the struct
    
    * It’s easier to ask for forgiveness than permission
    
    * Add some more tests
    
    * Python: Minor changes to Record to support old test cases. (#351)
    
    Co-authored-by: Ryan Blue <bl...@apache.org>
---
 python/pyiceberg/avro/file.py                |  34 +-
 python/pyiceberg/avro/reader.py              |  69 +-
 python/pyiceberg/avro/resolver.py            |  37 +-
 python/pyiceberg/catalog/rest.py             |   3 +-
 python/pyiceberg/cli/output.py               |   3 +-
 python/pyiceberg/manifest.py                 | 311 +++++----
 python/pyiceberg/partitioning.py             |   2 +-
 python/pyiceberg/schema.py                   |  10 +-
 python/pyiceberg/table/__init__.py           |  25 +-
 python/pyiceberg/table/metadata.py           |   3 +-
 python/pyiceberg/table/refs.py               |   2 +-
 python/pyiceberg/table/snapshots.py          |   2 +-
 python/pyiceberg/table/sorting.py            |   2 +-
 python/pyiceberg/transforms.py               |   3 +-
 python/pyiceberg/typedef.py                  |  88 ++-
 python/pyiceberg/types.py                    |   5 +-
 python/pyiceberg/utils/iceberg_base_model.py |  62 --
 python/tests/avro/test_file.py               |  22 +-
 python/tests/avro/test_reader.py             | 242 ++-----
 python/tests/avro/test_resolver.py           |  69 +-
 python/tests/expressions/test_evaluator.py   |  28 +-
 python/tests/expressions/test_expressions.py |  60 +-
 python/tests/expressions/test_visitors.py    |   8 +-
 python/tests/table/test_snapshots.py         |  39 --
 python/tests/test_schema.py                  |   4 +-
 python/tests/test_transforms.py              |   2 +-
 python/tests/test_typedef.py                 |  51 +-
 python/tests/test_types.py                   |   2 +-
 python/tests/utils/test_manifest.py          | 929 +++++----------------------
 29 files changed, 785 insertions(+), 1332 deletions(-)

diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py
index 5d469a27d7..56163d7bc7 100644
--- a/python/pyiceberg/avro/file.py
+++ b/python/pyiceberg/avro/file.py
@@ -26,8 +26,10 @@ from types import TracebackType
 from typing import (
     Callable,
     Dict,
+    Generic,
     Optional,
     Type,
+    TypeVar,
 )
 
 from pyiceberg.avro.codecs import KNOWN_CODECS, Codec
@@ -66,10 +68,9 @@ _CODEC_KEY = "avro.codec"
 _SCHEMA_KEY = "avro.schema"
 
 
-@dataclass(frozen=True)
-class AvroFileHeader:
+class AvroFileHeader(Record):
     magic: bytes
-    meta: dict[str, str]
+    meta: Dict[str, str]
     sync: bytes
 
     def compression_codec(self) -> Optional[Type[Codec]]:
@@ -93,49 +94,52 @@ class AvroFileHeader:
             raise ValueError("No schema found in Avro file headers")
 
 
+D = TypeVar("D", bound=StructProtocol)
+
+
 @dataclass
-class Block:
+class Block(Generic[D]):
     reader: Reader
     block_records: int
     block_decoder: BinaryDecoder
     position: int = 0
 
-    def __iter__(self) -> Block:
+    def __iter__(self) -> Block[D]:
         return self
 
     def has_next(self) -> bool:
         return self.position < self.block_records
 
-    def __next__(self) -> Record:
+    def __next__(self) -> D:
         if self.has_next():
             self.position += 1
             return self.reader.read(self.block_decoder)
         raise StopIteration
 
 
-class AvroFile:
+class AvroFile(Generic[D]):
     input_file: InputFile
     read_schema: Optional[Schema]
-    read_types: Dict[int, Callable[[Schema], StructProtocol]]
+    read_types: Dict[int, Callable[..., StructProtocol]]
     input_stream: InputStream
     header: AvroFileHeader
     schema: Schema
     reader: Reader
 
     decoder: BinaryDecoder
-    block: Optional[Block] = None
+    block: Optional[Block[D]] = None
 
     def __init__(
         self,
         input_file: InputFile,
         read_schema: Optional[Schema] = None,
-        read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+        read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
     ) -> None:
         self.input_file = input_file
         self.read_schema = read_schema
         self.read_types = read_types
 
-    def __enter__(self) -> AvroFile:
+    def __enter__(self) -> AvroFile[D]:
         """
         Opens the file and reads the header and generates
         a reader tree to start reading the payload
@@ -159,7 +163,7 @@ class AvroFile:
     ) -> None:
         self.input_stream.close()
 
-    def __iter__(self) -> AvroFile:
+    def __iter__(self) -> AvroFile[D]:
         return self
 
     def _read_block(self) -> int:
@@ -180,7 +184,7 @@ class AvroFile:
         )
         return block_records
 
-    def __next__(self) -> Record:
+    def __next__(self) -> D:
         if self.block and self.block.has_next():
             return next(self.block)
 
@@ -194,6 +198,4 @@ class AvroFile:
         raise StopIteration
 
     def _read_header(self) -> AvroFileHeader:
-        reader = construct_reader(META_SCHEMA)
-        _header = reader.read(self.decoder)
-        return AvroFileHeader(magic=_header.get(0), meta=_header.get(1), sync=_header.get(2))
+        return construct_reader(META_SCHEMA, {-1: AvroFileHeader}).read(self.decoder)
diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py
index f7a0194a9a..8cc0c4813e 100644
--- a/python/pyiceberg/avro/reader.py
+++ b/python/pyiceberg/avro/reader.py
@@ -41,7 +41,8 @@ from typing import (
 from uuid import UUID
 
 from pyiceberg.avro.decoder import BinaryDecoder
-from pyiceberg.typedef import Record, StructProtocol
+from pyiceberg.typedef import StructProtocol
+from pyiceberg.types import StructType
 from pyiceberg.utils.singleton import Singleton
 
 
@@ -90,6 +91,9 @@ class Reader(Singleton):
     def skip(self, decoder: BinaryDecoder) -> None:
         ...
 
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}()"
+
 
 class NoneReader(Reader):
     def read(self, _: BinaryDecoder) -> None:
@@ -194,6 +198,9 @@ class FixedReader(Reader):
     def __len__(self) -> int:
         return self._len
 
+    def __repr__(self) -> str:
+        return f"FixedReader({self._len})"
+
 
 class BinaryReader(Reader):
     def read(self, decoder: BinaryDecoder) -> bytes:
@@ -214,6 +221,9 @@ class DecimalReader(Reader):
     def skip(self, decoder: BinaryDecoder) -> None:
         decoder.skip_bytes()
 
+    def __repr__(self) -> str:
+        return f"DecimalReader({self.precision}, {self.scale})"
+
 
 @dataclass(frozen=True)
 class OptionReader(Reader):
@@ -238,41 +248,58 @@ class OptionReader(Reader):
             return self.option.skip(decoder)
 
 
-class StructProtocolReader(Reader):
-    create_struct: Callable[[], StructProtocol]
-    fields: Tuple[Tuple[Optional[int], Reader], ...]
+class StructReader(Reader):
+    field_readers: Tuple[Tuple[Optional[int], Reader], ...]
+    create_struct: Callable[..., StructProtocol]
+    struct: StructType
 
-    def __init__(self, fields: Tuple[Tuple[Optional[int], Reader], ...], create_struct: Callable[[], StructProtocol]):
+    def __init__(
+        self,
+        field_readers: Tuple[Tuple[Optional[int], Reader], ...],
+        create_struct: Callable[..., StructProtocol],
+        struct: StructType,
+    ) -> None:
+        self.field_readers = field_readers
         self.create_struct = create_struct
-        self.fields = fields
-
-    def create_or_reuse(self, reuse: Optional[StructProtocol]) -> StructProtocol:
-        if reuse:
-            return reuse
-        else:
-            return self.create_struct()
+        self.struct = struct
+
+    def read(self, decoder: BinaryDecoder) -> StructProtocol:
+        try:
+            # Try initializing the struct, first with the struct keyword argument
+            struct = self.create_struct(struct=self.struct)
+        except TypeError as e:
+            if "'struct' is an invalid keyword argument for" in str(e):
+                struct = self.create_struct()
+            else:
+                raise ValueError(f"Unable to initialize struct: {self.create_struct}") from e
 
-    def read(self, decoder: BinaryDecoder) -> Any:
-        struct = self.create_or_reuse(None)
+        if not isinstance(struct, StructProtocol):
+            raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}")
 
-        for (pos, field) in self.fields:
+        for (pos, field) in self.field_readers:
             if pos is not None:
-                struct.set(pos, field.read(decoder))  # later: pass reuse in here
+                struct[pos] = field.read(decoder)  # later: pass reuse in here
             else:
                 field.skip(decoder)
 
         return struct
 
     def skip(self, decoder: BinaryDecoder) -> None:
-        for _, field in self.fields:
+        for _, field in self.field_readers:
             field.skip(decoder)
 
+    def __eq__(self, other: Any) -> bool:
+        return (
+            self.field_readers == other.field_readers and self.create_struct == other.create_struct
+            if isinstance(other, StructReader)
+            else False
+        )
 
-class StructReader(StructProtocolReader):
-    fields: Tuple[Tuple[Optional[int], Reader], ...]
+    def __repr__(self) -> str:
+        return f"StructReader(({','.join(repr(field) for field in self.field_readers)}), {repr(self.create_struct)})"
 
-    def __init__(self, fields: Tuple[Tuple[Optional[int], Reader], ...]):
-        super().__init__(fields, lambda: Record.of(len(fields)))
+    def __hash__(self) -> int:
+        return hash(self.field_readers)
 
 
 @dataclass(frozen=True)
diff --git a/python/pyiceberg/avro/resolver.py b/python/pyiceberg/avro/resolver.py
index a53693f415..bacd942b33 100644
--- a/python/pyiceberg/avro/resolver.py
+++ b/python/pyiceberg/avro/resolver.py
@@ -53,7 +53,7 @@ from pyiceberg.schema import (
     promote,
     visit_with_partner,
 )
-from pyiceberg.typedef import EMPTY_DICT, StructProtocol
+from pyiceberg.typedef import EMPTY_DICT, Record, StructProtocol
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -78,7 +78,9 @@ from pyiceberg.types import (
 )
 
 
-def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+def construct_reader(
+    file_schema: Union[Schema, IcebergType], read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT
+) -> Reader:
     """Constructs a reader from a file schema
 
     Args:
@@ -87,13 +89,13 @@ def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
     Raises:
         NotImplementedError: If attempting to resolve an unrecognized object type
     """
-    return resolve(file_schema, file_schema)
+    return resolve(file_schema, file_schema, read_types)
 
 
 def resolve(
     file_schema: Union[Schema, IcebergType],
     read_schema: Union[Schema, IcebergType],
-    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+    read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
 ) -> Reader:
     """Resolves the file and read schema to produce a reader
 
@@ -109,28 +111,39 @@ def resolve(
 
 
 class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
-    read_types: Optional[Dict[int, Callable[[Schema], StructProtocol]]]
+    read_types: Dict[int, Callable[..., StructProtocol]]
+    context: List[int]
 
-    def __init__(self, read_types: Optional[Dict[int, Callable[[Schema], StructProtocol]]]):
+    def __init__(self, read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT) -> None:
         self.read_types = read_types
+        self.context = []
 
     def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader:
         return result
 
+    def before_field(self, field: NestedField, field_partner: Optional[NestedField]) -> None:
+        self.context.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: Optional[NestedField]) -> None:
+        self.context.pop()
+
     def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+        # -1 indicates the struct root
+        read_struct_id = self.context[-1] if len(self.context) > 0 else -1
+        struct_callable = self.read_types.get(read_struct_id, Record)
+
         if not expected_struct:
-            return StructReader(tuple(enumerate(field_readers)))
+            return StructReader(tuple(enumerate(field_readers)), struct_callable, struct)
 
         if not isinstance(expected_struct, StructType):
             raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}")
 
-        results: List[Tuple[Optional[int], Reader]] = []
         expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)}
 
         # first, add readers for the file fields that must be in order
-        for field, result_reader in zip(struct.fields, field_readers):
-            read_pos = expected_positions.get(field.field_id)
-            results.append((read_pos, result_reader))
+        results: List[Tuple[Optional[int], Reader]] = [
+            (expected_positions.get(field.field_id), result_reader) for field, result_reader in zip(struct.fields, field_readers)
+        ]
 
         file_fields = {field.field_id: field for field in struct.fields}
         for pos, read_field in enumerate(expected_struct.fields):
@@ -140,7 +153,7 @@ class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
                 # Just set the new field to None
                 results.append((pos, NoneReader()))
 
-        return StructReader(tuple(results))
+        return StructReader(tuple(results), struct_callable, expected_struct)
 
     def field(self, field: NestedField, expected_field: Optional[IcebergType], field_reader: Reader) -> Reader:
         return field_reader if field.required else OptionReader(field_reader)
diff --git a/python/pyiceberg/catalog/rest.py b/python/pyiceberg/catalog/rest.py
index f7bb6b346c..2f9eb5df11 100644
--- a/python/pyiceberg/catalog/rest.py
+++ b/python/pyiceberg/catalog/rest.py
@@ -57,8 +57,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table, TableMetadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
-from pyiceberg.typedef import EMPTY_DICT
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel
 
 ICEBERG_REST_SPEC_VERSION = "0.14.1"
 
diff --git a/python/pyiceberg/cli/output.py b/python/pyiceberg/cli/output.py
index 5bb61d456b..44c94f6b4c 100644
--- a/python/pyiceberg/cli/output.py
+++ b/python/pyiceberg/cli/output.py
@@ -26,8 +26,7 @@ from rich.tree import Tree
 from pyiceberg.partitioning import PartitionSpec
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table, TableMetadata
-from pyiceberg.typedef import Identifier, Properties
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+from pyiceberg.typedef import IcebergBaseModel, Identifier, Properties
 
 
 class Output(ABC):
diff --git a/python/pyiceberg/manifest.py b/python/pyiceberg/manifest.py
index 9f7f394159..757f3bd016 100644
--- a/python/pyiceberg/manifest.py
+++ b/python/pyiceberg/manifest.py
@@ -15,30 +15,29 @@
 # specific language governing permissions and limitations
 # under the License.
 from enum import Enum
-from functools import singledispatch
 from typing import (
     Any,
     Dict,
     Iterator,
     List,
     Optional,
-    Union,
 )
 
-from pydantic import Field
-
 from pyiceberg.avro.file import AvroFile
 from pyiceberg.io import FileIO, InputFile
 from pyiceberg.schema import Schema
 from pyiceberg.typedef import Record
 from pyiceberg.types import (
-    IcebergType,
+    BinaryType,
+    BooleanType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
-    PrimitiveType,
+    NestedField,
+    StringType,
     StructType,
 )
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
 
 
 class DataFileContent(int, Enum):
@@ -76,137 +75,207 @@ class FileFormat(str, Enum):
         return f"FileFormat.{self.name}"
 
 
-class DataFile(IcebergBaseModel):
-    content: DataFileContent = Field(default=DataFileContent.DATA)
-    file_path: str = Field()
-    file_format: FileFormat = Field()
-    partition: Dict[str, Any] = Field()
-    record_count: int = Field()
-    file_size_in_bytes: int = Field()
-    block_size_in_bytes: Optional[int] = Field()
-    column_sizes: Optional[Dict[int, int]] = Field()
-    value_counts: Optional[Dict[int, int]] = Field()
-    null_value_counts: Optional[Dict[int, int]] = Field()
-    nan_value_counts: Optional[Dict[int, int]] = Field()
-    distinct_counts: Optional[Dict[int, int]] = Field()
-    lower_bounds: Optional[Dict[int, bytes]] = Field()
-    upper_bounds: Optional[Dict[int, bytes]] = Field()
-    key_metadata: Optional[bytes] = Field()
-    split_offsets: Optional[List[int]] = Field()
-    equality_ids: Optional[List[int]] = Field()
-    sort_order_id: Optional[int] = Field()
-
-
-class ManifestEntry(IcebergBaseModel):
-    status: ManifestEntryStatus = Field()
-    snapshot_id: Optional[int] = Field()
-    sequence_number: Optional[int] = Field()
-    data_file: DataFile = Field()
-
-
-class PartitionFieldSummary(IcebergBaseModel):
-    contains_null: bool = Field()
-    contains_nan: Optional[bool] = Field()
-    lower_bound: Optional[bytes] = Field()
-    upper_bound: Optional[bytes] = Field()
-
-
-class ManifestFile(IcebergBaseModel):
-    manifest_path: str = Field()
-    manifest_length: int = Field()
-    partition_spec_id: int = Field()
-    content: ManifestContent = Field(default=ManifestContent.DATA)
-    sequence_number: int = Field(default=0)
-    min_sequence_number: int = Field(default=0)
-    added_snapshot_id: Optional[int] = Field()
-    added_data_files_count: Optional[int] = Field()
-    existing_data_files_count: Optional[int] = Field()
-    deleted_data_files_count: Optional[int] = Field()
-    added_rows_count: Optional[int] = Field()
-    existing_rows_counts: Optional[int] = Field()
-    deleted_rows_count: Optional[int] = Field()
-    partitions: Optional[List[PartitionFieldSummary]] = Field()
-    key_metadata: Optional[bytes] = Field()
-
-    def fetch_manifest_entry(self, io: FileIO) -> List[ManifestEntry]:
-        file = io.new_input(self.manifest_path)
-        return list(read_manifest_entry(file))
-
+DATA_FILE_TYPE = StructType(
+    NestedField(
+        field_id=134,
+        name="content",
+        field_type=IntegerType(),
+        required=False,
+        doc="Contents of the file: 0=data, 1=position deletes, 2=equality deletes",
+    ),
+    NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
+    NestedField(
+        field_id=101, name="file_format", field_type=StringType(), required=True, doc="File format name: avro, orc, or parquet"
+    ),
+    NestedField(
+        field_id=102,
+        name="partition",
+        field_type=StructType(),
+        required=True,
+        doc="Partition data tuple, schema based on the partition spec",
+    ),
+    NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
+    NestedField(field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"),
+    NestedField(
+        field_id=108,
+        name="column_sizes",
+        field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()),
+        required=True,
+        doc="Map of column id to total size on disk",
+    ),
+    NestedField(
+        field_id=109,
+        name="value_counts",
+        field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()),
+        required=True,
+        doc="Map of column id to total count, including null and NaN",
+    ),
+    NestedField(
+        field_id=110,
+        name="null_value_counts",
+        field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()),
+        required=False,
+        doc="Map of column id to null value count",
+    ),
+    NestedField(
+        field_id=137,
+        name="nan_value_counts",
+        field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()),
+        required=False,
+        doc="Map of column id to number of NaN values in the column",
+    ),
+    NestedField(
+        field_id=125,
+        name="lower_bounds",
+        field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()),
+        required=False,
+        doc="Map of column id to lower bound",
+    ),
+    NestedField(
+        field_id=128,
+        name="upper_bounds",
+        field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()),
+        required=False,
+        doc="Map of column id to upper bound",
+    ),
+    NestedField(field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob"),
+    NestedField(
+        field_id=132,
+        name="split_offsets",
+        field_type=ListType(element_id=133, element_type=LongType(), element_required=True),
+        required=False,
+        doc="Splittable offsets",
+    ),
+    NestedField(
+        field_id=135,
+        name="equality_ids",
+        field_type=ListType(element_id=136, element_type=LongType(), element_required=True),
+        required=False,
+        doc="Equality comparison field IDs",
+    ),
+    NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="Sort order ID"),
+    NestedField(field_id=141, name="spec_id", field_type=IntegerType(), required=False, doc="Partition spec ID"),
+)
 
-def read_manifest_entry(input_file: InputFile) -> Iterator[ManifestEntry]:
-    with AvroFile(input_file) as reader:
-        schema = reader.schema
-        for record in reader:
-            dict_repr = _convert_pos_to_dict(schema, record)
-            yield ManifestEntry(**dict_repr)
 
+class DataFile(Record):
+    content: Optional[DataFileContent]
+    file_path: str
+    file_format: FileFormat
+    partition: Record
+    record_count: int
+    file_size_in_bytes: int
+    column_sizes: Dict[int, int]
+    value_counts: Dict[int, int]
+    null_value_counts: Dict[int, int]
+    nan_value_counts: Dict[int, int]
+    lower_bounds: Dict[int, bytes]
+    upper_bounds: Dict[int, bytes]
+    key_metadata: Optional[bytes]
+    split_offsets: Optional[List[int]]
+    equality_ids: Optional[List[int]]
+    sort_order_id: Optional[int]
+    spec_id: Optional[int]
+
+    def __init__(self, *data: Any, **named_data: Any) -> None:
+        super().__init__(*data, **{"struct": DATA_FILE_TYPE, **named_data})
+
+
+MANIFEST_ENTRY_SCHEMA = Schema(
+    NestedField(0, "status", IntegerType(), required=True),
+    NestedField(1, "snapshot_id", LongType(), required=False),
+    NestedField(3, "sequence_number", LongType(), required=False),
+    NestedField(4, "file_sequence_number", LongType(), required=False),
+    NestedField(2, "data_file", DATA_FILE_TYPE, required=False),
+)
 
-def live_entries(input_file: InputFile) -> Iterator[ManifestEntry]:
-    return (entry for entry in read_manifest_entry(input_file) if entry.status != ManifestEntryStatus.DELETED)
 
+class ManifestEntry(Record):
+    status: ManifestEntryStatus
+    snapshot_id: Optional[int]
+    sequence_number: Optional[int]
+    file_sequence_number: Optional[int]
+    data_file: DataFile
 
-def files(input_file: InputFile) -> Iterator[DataFile]:
-    return (entry.data_file for entry in live_entries(input_file))
+    def __init__(self, *data: Any, **named_data: Any) -> None:
+        super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMA.as_struct(), **named_data})
 
 
-def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
-    with AvroFile(input_file) as reader:
-        schema = reader.schema
-        for record in reader:
-            dict_repr = _convert_pos_to_dict(schema, record)
-            yield ManifestFile(**dict_repr)
-
-
-@singledispatch
-def _convert_pos_to_dict(schema: Union[Schema, IcebergType], struct: Record) -> Dict[str, Any]:
-    """Converts the positions in the field names
+PARTITION_FIELD_SUMMARY_TYPE = StructType(
+    NestedField(509, "contains_null", BooleanType(), required=True),
+    NestedField(518, "contains_nan", BooleanType(), required=False),
+    NestedField(510, "lower_bound", BinaryType(), required=False),
+    NestedField(511, "upper_bound", BinaryType(), required=False),
+)
 
-    This makes it easy to map it onto a Pydantic model. Might change later on depending on the performance
 
-     Args:
-         schema (Schema | IcebergType): The schema of the file
-         struct (Record): The struct containing the data by positions
+class PartitionFieldSummary(Record):
+    contains_null: bool
+    contains_nan: Optional[bool]
+    lower_bound: Optional[bytes]
+    upper_bound: Optional[bytes]
+
+    def __init__(self, *data: Any, **named_data: Any) -> None:
+        super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, **named_data})
+
+
+MANIFEST_FILE_SCHEMA: Schema = Schema(
+    NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
+    NestedField(501, "manifest_length", LongType(), required=True),
+    NestedField(502, "partition_spec_id", IntegerType(), required=True),
+    NestedField(517, "content", IntegerType(), required=False),
+    NestedField(515, "sequence_number", LongType(), required=False),
+    NestedField(516, "min_sequence_number", LongType(), required=False),
+    NestedField(503, "added_snapshot_id", LongType(), required=False),
+    NestedField(504, "added_files_count", IntegerType(), required=False),
+    NestedField(505, "existing_files_count", IntegerType(), required=False),
+    NestedField(506, "deleted_files_count", IntegerType(), required=False),
+    NestedField(512, "added_rows_count", LongType(), required=False),
+    NestedField(513, "existing_rows_count", LongType(), required=False),
+    NestedField(514, "deleted_rows_count", LongType(), required=False),
+    NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
+    NestedField(519, "key_metadata", BinaryType(), required=False),
+)
 
-     Raises:
-         NotImplementedError: If attempting to handle an unknown type in the schema
-    """
-    raise NotImplementedError(f"Cannot traverse non-type: {schema}")
 
+class ManifestFile(Record):
+    manifest_path: str
+    manifest_length: int
+    partition_spec_id: int
+    content: Optional[ManifestContent]
+    sequence_number: Optional[int]
+    min_sequence_number: Optional[int]
+    added_snapshot_id: Optional[int]
+    added_files_count: Optional[int]
+    existing_files_count: Optional[int]
+    deleted_files_count: Optional[int]
+    added_rows_count: Optional[int]
+    existing_rows_count: Optional[int]
+    deleted_rows_count: Optional[int]
+    partitions: Optional[List[PartitionFieldSummary]]
+    key_metadata: Optional[bytes]
+
+    def __init__(self, *data: Any, **named_data: Any) -> None:
+        super().__init__(*data, **{"struct": MANIFEST_FILE_SCHEMA.as_struct(), **named_data})
 
-@_convert_pos_to_dict.register
-def _(schema: Schema, struct: Record) -> Dict[str, Any]:
-    return _convert_pos_to_dict(schema.as_struct(), struct)
+    def fetch_manifest_entry(self, io: FileIO) -> List[ManifestEntry]:
+        file = io.new_input(self.manifest_path)
+        return list(read_manifest_entry(file))
 
 
-@_convert_pos_to_dict.register
-def _(struct_type: StructType, values: Record) -> Dict[str, Any]:
-    """Iterates over all the fields in the dict, and gets the data from the struct"""
-    return (
-        {field.name: _convert_pos_to_dict(field.field_type, values.get(pos)) for pos, field in enumerate(struct_type.fields)}
-        if values is not None
-        else None
-    )
+def read_manifest_entry(input_file: InputFile) -> Iterator[ManifestEntry]:
+    with AvroFile[ManifestEntry](input_file, MANIFEST_ENTRY_SCHEMA, {-1: ManifestEntry, 2: DataFile}) as reader:
+        yield from reader
 
 
-@_convert_pos_to_dict.register
-def _(list_type: ListType, values: List[Any]) -> Any:
-    """In the case of a list, we'll go over the elements in the list to handle complex types"""
-    return [_convert_pos_to_dict(list_type.element_type, value) for value in values] if values is not None else None
+def live_entries(input_file: InputFile) -> Iterator[ManifestEntry]:
+    return (entry for entry in read_manifest_entry(input_file) if entry.status != ManifestEntryStatus.DELETED)
 
 
-@_convert_pos_to_dict.register
-def _(map_type: MapType, values: Dict[Any, Any]) -> Dict[Any, Any]:
-    """In the case of a map, we both traverse over the key and value to handle complex types"""
-    return (
-        {
-            _convert_pos_to_dict(map_type.key_type, key): _convert_pos_to_dict(map_type.value_type, value)
-            for key, value in values.items()
-        }
-        if values is not None
-        else None
-    )
+def files(input_file: InputFile) -> Iterator[DataFile]:
+    return (entry.data_file for entry in live_entries(input_file))
 
 
-@_convert_pos_to_dict.register
-def _(_: PrimitiveType, value: Any) -> Any:
-    return value
+def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
+    with AvroFile[ManifestFile](input_file, MANIFEST_FILE_SCHEMA, {-1: ManifestFile, 508: PartitionFieldSummary}) as reader:
+        yield from reader
diff --git a/python/pyiceberg/partitioning.py b/python/pyiceberg/partitioning.py
index aca79bfa76..9313b1606c 100644
--- a/python/pyiceberg/partitioning.py
+++ b/python/pyiceberg/partitioning.py
@@ -27,8 +27,8 @@ from pydantic import Field
 
 from pyiceberg.schema import Schema
 from pyiceberg.transforms import Transform
+from pyiceberg.typedef import IcebergBaseModel
 from pyiceberg.types import NestedField, StructType
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
 
 INITIAL_PARTITION_SPEC_ID = 0
 _PARTITION_DATA_ID_START: int = 1000
diff --git a/python/pyiceberg/schema.py b/python/pyiceberg/schema.py
index 1a85cb4baa..45e9ffe26a 100644
--- a/python/pyiceberg/schema.py
+++ b/python/pyiceberg/schema.py
@@ -36,7 +36,7 @@ from typing import (
 from pydantic import Field, PrivateAttr
 
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.typedef import EMPTY_DICT, StructProtocol
+from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel, StructProtocol
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -59,7 +59,6 @@ from pyiceberg.types import (
     TimeType,
     UUIDType,
 )
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
 
 T = TypeVar("T")
 P = TypeVar("P")
@@ -94,6 +93,9 @@ class Schema(IcebergBaseModel):
     def __repr__(self) -> str:
         return f"Schema({', '.join(repr(column) for column in self.columns)}, schema_id={self.schema_id}, identifier_field_ids={self.identifier_field_ids})"
 
+    def __len__(self) -> int:
+        return len(self.fields)
+
     def __eq__(self, other: Any) -> bool:
         if not other:
             return False
@@ -684,11 +686,11 @@ class Accessor:
             Any: The value at position `self.position` in the container
         """
         pos = self.position
-        val = container.get(pos)
+        val = container[pos]
         inner = self
         while inner.inner:
             inner = inner.inner
-            val = val.get(inner.position)
+            val = val[inner.position]
 
         return val
 
diff --git a/python/pyiceberg/table/__init__.py b/python/pyiceberg/table/__init__.py
index 3fb0702889..c4c23100c9 100644
--- a/python/pyiceberg/table/__init__.py
+++ b/python/pyiceberg/table/__init__.py
@@ -54,9 +54,7 @@ from pyiceberg.typedef import (
     Identifier,
     KeyDefaultDict,
     Properties,
-    StructProtocol,
 )
-from pyiceberg.types import StructType
 
 if TYPE_CHECKING:
     import pandas as pd
@@ -259,24 +257,6 @@ class FileScanTask(ScanTask):
         self.length = length or data_file.file_size_in_bytes
 
 
-class _DictAsStruct(StructProtocol):
-    pos_to_name: Dict[int, str]
-    wrapped: Dict[str, Any]
-
-    def __init__(self, partition_type: StructType):
-        self.pos_to_name = {pos: field.name for pos, field in enumerate(partition_type.fields)}
-
-    def wrap(self, to_wrap: Dict[str, Any]) -> _DictAsStruct:
-        self.wrapped = to_wrap
-        return self
-
-    def get(self, pos: int) -> Any:
-        return self.wrapped[self.pos_to_name[pos]]
-
-    def set(self, pos: int, value: Any) -> None:
-        raise NotImplementedError("Cannot set values in DictAsStruct")
-
-
 class DataScan(TableScan["DataScan"]):
     def __init__(
         self,
@@ -307,11 +287,8 @@ class DataScan(TableScan["DataScan"]):
         partition_schema = Schema(*partition_type.fields)
         partition_expr = self.partition_filters[spec_id]
 
-        # TODO: remove the dict to struct wrapper by using a StructProtocol record  # pylint: disable=W0511
-        wrapper = _DictAsStruct(partition_type)
         evaluator = visitors.expression_evaluator(partition_schema, partition_expr, self.case_sensitive)
-
-        return lambda data_file: evaluator(wrapper.wrap(data_file.partition))
+        return lambda data_file: evaluator(data_file.partition)
 
     def plan_files(self) -> Iterator[FileScanTask]:
         snapshot = self.snapshot()
diff --git a/python/pyiceberg/table/metadata.py b/python/pyiceberg/table/metadata.py
index 5704222d2c..31e74e7084 100644
--- a/python/pyiceberg/table/metadata.py
+++ b/python/pyiceberg/table/metadata.py
@@ -39,9 +39,8 @@ from pyiceberg.table.sorting import (
     SortOrder,
     assign_fresh_sort_order_ids,
 )
-from pyiceberg.typedef import EMPTY_DICT, Properties
+from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel, Properties
 from pyiceberg.utils.datetime import datetime_to_micros
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
 
 CURRENT_SNAPSHOT_ID = "current_snapshot_id"
 CURRENT_SCHEMA_ID = "current_schema_id"
diff --git a/python/pyiceberg/table/refs.py b/python/pyiceberg/table/refs.py
index 58f6121ed9..09cbad27f1 100644
--- a/python/pyiceberg/table/refs.py
+++ b/python/pyiceberg/table/refs.py
@@ -19,7 +19,7 @@ from typing import Optional
 
 from pydantic import Field
 
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+from pyiceberg.typedef import IcebergBaseModel
 
 MAIN_BRANCH = "main"
 
diff --git a/python/pyiceberg/table/snapshots.py b/python/pyiceberg/table/snapshots.py
index af17fd4b4e..83dd3f66a7 100644
--- a/python/pyiceberg/table/snapshots.py
+++ b/python/pyiceberg/table/snapshots.py
@@ -27,7 +27,7 @@ from pydantic import Field, PrivateAttr, root_validator
 
 from pyiceberg.io import FileIO
 from pyiceberg.manifest import ManifestFile, read_manifest_list
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+from pyiceberg.typedef import IcebergBaseModel
 
 OPERATION = "operation"
 
diff --git a/python/pyiceberg/table/sorting.py b/python/pyiceberg/table/sorting.py
index 67f27d82eb..7ca64399d6 100644
--- a/python/pyiceberg/table/sorting.py
+++ b/python/pyiceberg/table/sorting.py
@@ -29,8 +29,8 @@ from pydantic import Field, root_validator
 
 from pyiceberg.schema import Schema
 from pyiceberg.transforms import IdentityTransform, Transform
+from pyiceberg.typedef import IcebergBaseModel
 from pyiceberg.types import IcebergType
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
 
 
 class SortDirection(Enum):
diff --git a/python/pyiceberg/transforms.py b/python/pyiceberg/transforms.py
index b163c02569..e0197f0639 100644
--- a/python/pyiceberg/transforms.py
+++ b/python/pyiceberg/transforms.py
@@ -60,7 +60,7 @@ from pyiceberg.expressions.literals import (
     TimestampLiteral,
     literal,
 )
-from pyiceberg.typedef import L
+from pyiceberg.typedef import IcebergBaseModel, L
 from pyiceberg.types import (
     BinaryType,
     DateType,
@@ -77,7 +77,6 @@ from pyiceberg.types import (
 )
 from pyiceberg.utils import datetime
 from pyiceberg.utils.decimal import decimal_to_bytes, truncate_decimal
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
 from pyiceberg.utils.parsing import ParseNumberFromBrackets
 from pyiceberg.utils.singleton import Singleton
 
diff --git a/python/pyiceberg/typedef.py b/python/pyiceberg/typedef.py
index 228b9a927c..bdf467101c 100644
--- a/python/pyiceberg/typedef.py
+++ b/python/pyiceberg/typedef.py
@@ -18,12 +18,15 @@ from __future__ import annotations
 
 from abc import abstractmethod
 from decimal import Decimal
+from functools import cached_property
 from typing import (
+    TYPE_CHECKING,
     Any,
     Callable,
     Dict,
-    List,
+    Optional,
     Protocol,
+    Set,
     Tuple,
     TypeVar,
     Union,
@@ -31,6 +34,11 @@ from typing import (
 )
 from uuid import UUID
 
+from pydantic import BaseModel
+
+if TYPE_CHECKING:
+    from pyiceberg.types import StructType
+
 
 class FrozenDict(Dict[Any, Any]):
     def __setitem__(self, instance: Any, value: Any) -> None:
@@ -42,7 +50,6 @@ class FrozenDict(Dict[Any, Any]):
 
 EMPTY_DICT = FrozenDict()
 
-
 K = TypeVar("K")
 V = TypeVar("V")
 
@@ -72,34 +79,79 @@ class StructProtocol(Protocol):  # pragma: no cover
     """A generic protocol used by accessors to get and set at positions of an object"""
 
     @abstractmethod
-    def get(self, pos: int) -> Any:
+    def __getitem__(self, pos: int) -> Any:
         ...
 
     @abstractmethod
-    def set(self, pos: int, value: Any) -> None:
+    def __setitem__(self, pos: int, value: Any) -> None:
         ...
 
 
+class IcebergBaseModel(BaseModel):
+    """
+    This class extends the Pydantic BaseModel to set default values by overriding them.
+
+    This is because we always want to set by_alias to True. In Python, the dash can't
+    be used in variable names, and this is used throughout the Iceberg spec.
+
+    The same goes for exclude_none, if a field is None we want to omit it from
+    serialization, for example, the doc attribute on the NestedField object.
+    Default non-null values will be serialized.
+
+    This is recommended by Pydantic:
+    https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally
+    """
+
+    class Config:
+        keep_untouched = (cached_property,)
+        allow_population_by_field_name = True
+        frozen = True
+
+    def _exclude_private_properties(self, exclude: Optional[Set[str]] = None) -> Set[str]:
+        # A small trick to exclude private properties. Properties are serialized by pydantic,
+        # regardless if they start with an underscore.
+        # This will look at the dict, and find the fields and exclude them
+        return set.union(
+            {field for field in self.__dict__ if field.startswith("_") and not field == "__root__"}, exclude or set()
+        )
+
+    def dict(self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, **kwargs: Any) -> Dict[str, Any]:
+        return super().dict(exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), **kwargs)
+
+    def json(self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, by_alias: bool = True, **kwargs: Any) -> str:
+        return super().json(
+            exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), by_alias=by_alias, **kwargs
+        )
+
+
 class Record(StructProtocol):
-    _data: List[Union[Any, StructProtocol]]
+    _position_to_field_name: Dict[int, str]
+
+    def __init__(self, *data: Any, struct: Optional[StructType] = None, **named_data: Any) -> None:
+        if struct is not None:
+            self._position_to_field_name = {idx: field.name for idx, field in enumerate(struct.fields)}
+        elif named_data:
+            # Order of named_data is preserved (PEP 468) so this can be used to generate the position dict
+            self._position_to_field_name = {idx: name for idx, name in enumerate(named_data.keys())}
+        else:
+            self._position_to_field_name = {idx: f"field{idx + 1}" for idx in range(len(data))}
 
-    @staticmethod
-    def of(num_fields: int) -> Record:
-        return Record(*([None] * num_fields))
+        for idx, d in enumerate(data):
+            self[idx] = d
 
-    def __init__(self, *data: Union[Any, StructProtocol]) -> None:
-        self._data = list(data)
+        for field_name, d in named_data.items():
+            self.__setattr__(field_name, d)
 
-    def set(self, pos: int, value: Any) -> None:
-        print(f"set({pos}, {repr(value)})")
-        self._data[pos] = value
+    def __setitem__(self, pos: int, value: Any) -> None:
+        self.__setattr__(self._position_to_field_name[pos], value)
 
-    def get(self, pos: int) -> Any:
-        return self._data[pos]
+    def __getitem__(self, pos: int) -> Any:
+        return self.__getattribute__(self._position_to_field_name[pos])
 
     def __eq__(self, other: Any) -> bool:
-        # For testing
-        return True if isinstance(other, Record) and other._data == self._data else False
+        if not isinstance(other, Record):
+            return False
+        return self.__dict__ == other.__dict__
 
     def __repr__(self) -> str:
-        return f"{self.__class__.__name__}[" + ", ".join([repr(e) for e in self._data]) + "]"
+        return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' for key, value in self.__dict__.items() if not key.startswith('_'))}]"
diff --git a/python/pyiceberg/types.py b/python/pyiceberg/types.py
index 6a736ed19d..e11f6138c9 100644
--- a/python/pyiceberg/types.py
+++ b/python/pyiceberg/types.py
@@ -45,7 +45,7 @@ from typing import (
 from pydantic import Field, PrivateAttr
 from pydantic.typing import AnyCallable
 
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+from pyiceberg.typedef import IcebergBaseModel
 from pyiceberg.utils.parsing import ParseNumberFromBrackets
 from pyiceberg.utils.singleton import Singleton
 
@@ -280,6 +280,9 @@ class StructType(IcebergType):
     def __repr__(self) -> str:
         return f"StructType(fields=({', '.join(map(repr, self.fields))},))"
 
+    def __len__(self) -> int:
+        return len(self.fields)
+
 
 class ListType(IcebergType):
     """A list type in Iceberg
diff --git a/python/pyiceberg/utils/iceberg_base_model.py b/python/pyiceberg/utils/iceberg_base_model.py
deleted file mode 100644
index 8c5bc94f68..0000000000
--- a/python/pyiceberg/utils/iceberg_base_model.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from functools import cached_property
-from typing import (
-    Any,
-    Dict,
-    Optional,
-    Set,
-)
-
-from pydantic import BaseModel
-
-
-class IcebergBaseModel(BaseModel):
-    """
-    This class extends the Pydantic BaseModel to set default values by overriding them.
-
-    This is because we always want to set by_alias to True. In Python, the dash can't
-    be used in variable names, and this is used throughout the Iceberg spec.
-
-    The same goes for exclude_none, if a field is None we want to omit it from
-    serialization, for example, the doc attribute on the NestedField object.
-    Default non-null values will be serialized.
-
-    This is recommended by Pydantic:
-    https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally
-    """
-
-    class Config:
-        keep_untouched = (cached_property,)
-        allow_population_by_field_name = True
-        frozen = True
-
-    def _exclude_private_properties(self, exclude: Optional[Set[str]] = None) -> Set[str]:
-        # A small trick to exclude private properties. Properties are serialized by pydantic,
-        # regardless if they start with an underscore.
-        # This will look at the dict, and find the fields and exclude them
-        return set.union(
-            {field for field in self.__dict__ if field.startswith("_") and not field == "__root__"}, exclude or set()
-        )
-
-    def dict(self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, **kwargs: Any) -> Dict[str, Any]:
-        return super().dict(exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), **kwargs)
-
-    def json(self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, by_alias: bool = True, **kwargs: Any) -> str:
-        return super().json(
-            exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), by_alias=by_alias, **kwargs
-        )
diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py
index fc46963375..cdb973a395 100644
--- a/python/tests/avro/test_file.py
+++ b/python/tests/avro/test_file.py
@@ -17,21 +17,30 @@
 import pytest
 
 from pyiceberg.avro.codecs import DeflateCodec
-from pyiceberg.avro.file import AvroFileHeader
+from pyiceberg.avro.file import META_SCHEMA, AvroFileHeader
 
 
 def get_deflate_compressor() -> None:
-    header = AvroFileHeader(bytes(0), {"avro.codec": "deflate"}, bytes(16))
+    header = AvroFileHeader(struct=META_SCHEMA)
+    header[0] = bytes(0)
+    header[1] = {"avro.codec": "deflate"}
+    header[2] = bytes(16)
     assert header.compression_codec() == DeflateCodec
 
 
 def get_null_compressor() -> None:
-    header = AvroFileHeader(bytes(0), {"avro.codec": "null"}, bytes(16))
+    header = AvroFileHeader(struct=META_SCHEMA)
+    header[0] = bytes(0)
+    header[1] = {"avro.codec": "null"}
+    header[2] = bytes(16)
     assert header.compression_codec() is None
 
 
 def test_unknown_codec() -> None:
-    header = AvroFileHeader(bytes(0), {"avro.codec": "unknown"}, bytes(16))
+    header = AvroFileHeader(struct=META_SCHEMA)
+    header[0] = bytes(0)
+    header[1] = {"avro.codec": "unknown"}
+    header[2] = bytes(16)
 
     with pytest.raises(ValueError) as exc_info:
         header.compression_codec()
@@ -40,7 +49,10 @@ def test_unknown_codec() -> None:
 
 
 def test_missing_schema() -> None:
-    header = AvroFileHeader(bytes(0), {}, bytes(16))
+    header = AvroFileHeader(struct=META_SCHEMA)
+    header[0] = bytes(0)
+    header[1] = {}
+    header[2] = bytes(16)
 
     with pytest.raises(ValueError) as exc_info:
         header.get_schema()
diff --git a/python/tests/avro/test_reader.py b/python/tests/avro/test_reader.py
index f8ec3326ca..b8736dd69b 100644
--- a/python/tests/avro/test_reader.py
+++ b/python/tests/avro/test_reader.py
@@ -19,6 +19,7 @@ import json
 
 import pytest
 
+from pyiceberg.avro.decoder import BinaryDecoder
 from pyiceberg.avro.file import AvroFile
 from pyiceberg.avro.reader import (
     BinaryReader,
@@ -30,14 +31,16 @@ from pyiceberg.avro.reader import (
     FloatReader,
     IntegerReader,
     StringReader,
+    StructReader,
     TimeReader,
     TimestampReader,
     TimestamptzReader,
     UUIDReader,
 )
 from pyiceberg.avro.resolver import construct_reader
+from pyiceberg.io.memory import MemoryInputStream
 from pyiceberg.io.pyarrow import PyArrowFileIO
-from pyiceberg.manifest import _convert_pos_to_dict
+from pyiceberg.manifest import MANIFEST_ENTRY_SCHEMA, DataFile, ManifestEntry
 from pyiceberg.schema import Schema
 from pyiceberg.typedef import Record
 from pyiceberg.types import (
@@ -49,9 +52,7 @@ from pyiceberg.types import (
     FixedType,
     FloatType,
     IntegerType,
-    ListType,
     LongType,
-    MapType,
     NestedField,
     PrimitiveType,
     StringType,
@@ -64,7 +65,11 @@ from pyiceberg.types import (
 
 
 def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_schema: Schema) -> None:
-    with AvroFile(PyArrowFileIO().new_input(generated_manifest_entry_file)) as reader:
+    with AvroFile[ManifestEntry](
+        PyArrowFileIO().new_input(generated_manifest_entry_file),
+        MANIFEST_ENTRY_SCHEMA,
+        {-1: ManifestEntry, 2: DataFile},
+    ) as reader:
         header = reader.header
 
     assert header.magic == b"Obj\x01"
@@ -264,187 +269,6 @@ def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_
     assert header.get_schema() == iceberg_manifest_entry_schema
 
 
-def test_read_manifest_entry_file(generated_manifest_entry_file: str) -> None:
-    with AvroFile(PyArrowFileIO().new_input(generated_manifest_entry_file)) as reader:
-        # Consume the generator
-        records = list(reader)
-
-    assert len(records) == 2, f"Expected 2 records, got {len(records)}"
-    assert records[0] == Record(
-        1,
-        8744736658442914487,
-        Record(
-            "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
-            "PARQUET",
-            Record(1, 1925),
-            19513,
-            388872,
-            67108864,
-            {
-                1: 53,
-                2: 98153,
-                3: 98693,
-                4: 53,
-                5: 53,
-                6: 53,
-                7: 17425,
-                8: 18528,
-                9: 53,
-                10: 44788,
-                11: 35571,
-                12: 53,
-                13: 1243,
-                14: 2355,
-                15: 12750,
-                16: 4029,
-                17: 110,
-                18: 47194,
-                19: 2948,
-            },
-            {
-                1: 19513,
-                2: 19513,
-                3: 19513,
-                4: 19513,
-                5: 19513,
-                6: 19513,
-                7: 19513,
-                8: 19513,
-                9: 19513,
-                10: 19513,
-                11: 19513,
-                12: 19513,
-                13: 19513,
-                14: 19513,
-                15: 19513,
-                16: 19513,
-                17: 19513,
-                18: 19513,
-                19: 19513,
-            },
-            {
-                1: 19513,
-                2: 0,
-                3: 0,
-                4: 19513,
-                5: 19513,
-                6: 19513,
-                7: 0,
-                8: 0,
-                9: 19513,
-                10: 0,
-                11: 0,
-                12: 19513,
-                13: 0,
-                14: 0,
-                15: 0,
-                16: 0,
-                17: 0,
-                18: 0,
-                19: 0,
-            },
-            {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
-            {
-                2: b"2020-04-01 00:00",
-                3: b"2020-04-01 00:12",
-                7: b"\x03\x00\x00\x00",
-                8: b"\x01\x00\x00\x00",
-                10: b"\xf6(\\\x8f\xc2\x05S\xc0",
-                11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
-                15: b")\\\x8f\xc2\xf5(\x08\xc0",
-                16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
-                19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
-            },
-            {
-                2: b"2020-04-30 23:5:",
-                3: b"2020-05-01 00:41",
-                7: b"\t\x01\x00\x00",
-                8: b"\t\x01\x00\x00",
-                10: b"\xcd\xcc\xcc\xcc\xcc,_@",
-                11: b"\x1f\x85\xebQ\\\xe2\xfe@",
-                13: b"\x00\x00\x00\x00\x00\x00\x12@",
-                14: b"\x00\x00\x00\x00\x00\x00\xe0?",
-                15: b"q=\n\xd7\xa3\xf01@",
-                16: b"\x00\x00\x00\x00\x00`B@",
-                17: b"333333\xd3?",
-                18: b"\x00\x00\x00\x00\x00\x18b@",
-                19: b"\x00\x00\x00\x00\x00\x00\x04@",
-            },
-            None,
-            [4],
-            0,
-        ),
-    )
-
-
-def test_read_manifest_file_file(generated_manifest_file_file: str) -> None:
-    with AvroFile(PyArrowFileIO().new_input(generated_manifest_file_file)) as reader:
-        # Consume the generator
-        records = list(reader)
-
-    assert len(records) == 1, f"Expected 1 records, got {len(records)}"
-    actual = records[0]
-    expected = Record(
-        actual.get(0),
-        7989,
-        0,
-        9182715666859759686,
-        3,
-        0,
-        0,
-        [Record(True, False, b"\x01\x00\x00\x00", b"\x02\x00\x00\x00")],
-        237993,
-        0,
-        0,
-    )
-    assert actual == expected
-
-
-def test_null_list_convert_pos_to_dict() -> None:
-    data = _convert_pos_to_dict(
-        Schema(
-            NestedField(name="field", field_id=1, field_type=ListType(element_id=2, element=StringType(), element_required=False))
-        ),
-        Record(None),
-    )
-    assert data["field"] is None
-
-
-def test_null_dict_convert_pos_to_dict() -> None:
-    data = _convert_pos_to_dict(
-        Schema(
-            NestedField(
-                name="field",
-                field_id=1,
-                field_type=MapType(key_id=2, key_type=StringType(), value_id=3, value_type=StringType(), value_required=False),
-            )
-        ),
-        Record(None),
-    )
-    assert data["field"] is None
-
-
-def test_null_struct_convert_pos_to_dict() -> None:
-    data = _convert_pos_to_dict(
-        Schema(
-            NestedField(
-                name="field",
-                field_id=1,
-                field_type=StructType(
-                    NestedField(2, "required_field", StringType(), True), NestedField(3, "optional_field", IntegerType())
-                ),
-                required=False,
-            )
-        ),
-        Record(None),
-    )
-    assert data["field"] is None
-
-
 def test_fixed_reader() -> None:
     assert construct_reader(FixedType(22)) == FixedReader(22)
 
@@ -509,3 +333,51 @@ def test_unknown_type() -> None:
 
 def test_uuid_reader() -> None:
     assert construct_reader(UUIDType()) == UUIDReader()
+
+
+def test_read_struct() -> None:
+    mis = MemoryInputStream(b"\x18")
+    decoder = BinaryDecoder(mis)
+
+    struct = StructType(NestedField(1, "id", IntegerType(), required=True))
+    result = StructReader(((0, IntegerReader()),), Record, struct).read(decoder)
+    assert repr(result) == "Record[id=12]"
+
+
+def test_read_struct_lambda() -> None:
+    mis = MemoryInputStream(b"\x18")
+    decoder = BinaryDecoder(mis)
+
+    struct = StructType(NestedField(1, "id", IntegerType(), required=True))
+    # You can also pass in an arbitrary function that returns a struct
+    result = StructReader(
+        ((0, IntegerReader()),), lambda struct: Record(struct=struct), struct  # pylint: disable=unnecessary-lambda
+    ).read(decoder)
+    assert repr(result) == "Record[id=12]"
+
+
+def test_read_not_struct_type() -> None:
+    mis = MemoryInputStream(b"\x18")
+    decoder = BinaryDecoder(mis)
+
+    struct = StructType(NestedField(1, "id", IntegerType(), required=True))
+    with pytest.raises(ValueError) as exc_info:
+        _ = StructReader(((0, IntegerReader()),), str, struct).read(decoder)  # type: ignore
+
+    assert "Incompatible with StructProtocol: <class 'str'>" in str(exc_info.value)
+
+
+def test_read_struct_exception_handling() -> None:
+    mis = MemoryInputStream(b"\x18")
+    decoder = BinaryDecoder(mis)
+
+    def raise_err(struct: StructType) -> None:
+        raise TypeError("boom")
+
+    struct = StructType(NestedField(1, "id", IntegerType(), required=True))
+    # You can also pass in an arbitrary function that returns a struct
+
+    with pytest.raises(ValueError) as exc_info:
+        _ = StructReader(((0, IntegerReader()),), raise_err, struct).read(decoder)  # type: ignore
+
+    assert "Unable to initialize struct:" in str(exc_info.value)
diff --git a/python/tests/avro/test_resolver.py b/python/tests/avro/test_resolver.py
index c36b76922a..42e9146c48 100644
--- a/python/tests/avro/test_resolver.py
+++ b/python/tests/avro/test_resolver.py
@@ -14,8 +14,14 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+from tempfile import TemporaryDirectory
+from typing import Optional
+
 import pytest
+from pydantic import Field
 
+from pyiceberg.avro.file import AvroFile
 from pyiceberg.avro.reader import (
     DecimalReader,
     DoubleReader,
@@ -26,7 +32,9 @@ from pyiceberg.avro.reader import (
     StructReader,
 )
 from pyiceberg.avro.resolver import ResolveError, resolve
+from pyiceberg.io.pyarrow import PyArrowFileIO
 from pyiceberg.schema import Schema
+from pyiceberg.typedef import Record
 from pyiceberg.types import (
     BinaryType,
     DecimalType,
@@ -57,14 +65,16 @@ def test_resolver() -> None:
         NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
         schema_id=1,
     )
+
+    location_struct = StructType(
+        NestedField(4, "lat", DoubleType()),
+        NestedField(5, "long", DoubleType()),
+    )
     read_schema = Schema(
         NestedField(
             3,
             "location",
-            StructType(
-                NestedField(4, "lat", DoubleType()),
-                NestedField(5, "long", DoubleType()),
-            ),
+            location_struct,
         ),
         NestedField(1, "id", LongType()),
         NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
@@ -82,11 +92,15 @@ def test_resolver() -> None:
                     (
                         (0, DoubleReader()),
                         (1, DoubleReader()),
-                    )
+                    ),
+                    Record,
+                    location_struct,
                 ),
             ),
             (2, MapReader(StringReader(), StringReader())),
-        )
+        ),
+        Record,
+        read_schema.as_struct(),
     )
 
 
@@ -223,3 +237,46 @@ def test_resolve_decimal_to_decimal_reduce_precision() -> None:
         _ = resolve(DecimalType(19, 25), DecimalType(10, 25)) == DecimalReader(22, 25)
 
     assert "Cannot reduce precision from decimal(19, 25) to decimal(10, 25)" in str(exc_info.value)
+
+
+def test_column_assignment() -> None:
+    int_schema = {
+        "type": "record",
+        "name": "ints",
+        "fields": [
+            {"name": "a", "type": "int", "field-id": 1},
+            {"name": "b", "type": "int", "field-id": 2},
+            {"name": "c", "type": "int", "field-id": 3},
+        ],
+    }
+
+    from fastavro import parse_schema, writer
+
+    parsed_schema = parse_schema(int_schema)
+
+    int_records = [
+        {
+            "a": 1,
+            "b": 2,
+            "c": 3,
+        }
+    ]
+
+    with TemporaryDirectory() as tmpdir:
+        tmp_avro_file = tmpdir + "/manifest.avro"
+        with open(tmp_avro_file, "wb") as out:
+            writer(out, parsed_schema, int_records)
+
+        class Ints(Record):
+            c: int = Field()
+            d: Optional[int] = Field()
+
+        MANIFEST_ENTRY_SCHEMA = Schema(
+            NestedField(3, "c", IntegerType(), required=True),
+            NestedField(4, "d", IntegerType(), required=False),
+        )
+
+        with AvroFile[Ints](PyArrowFileIO().new_input(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, {-1: Ints}) as reader:
+            records = list(reader)
+
+    assert repr(records) == "[Ints[c=3, d=None]]"
diff --git a/python/tests/expressions/test_evaluator.py b/python/tests/expressions/test_evaluator.py
index 95766b0024..b57ede62f2 100644
--- a/python/tests/expressions/test_evaluator.py
+++ b/python/tests/expressions/test_evaluator.py
@@ -14,6 +14,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from typing import Optional
+
 from pyiceberg.expressions import (
     AlwaysFalse,
     AlwaysTrue,
@@ -52,6 +54,20 @@ FLOAT_SCHEMA = Schema(
 )
 
 
+def _record_simple(id: int, data: Optional[str]) -> Record:  # pylint: disable=redefined-builtin
+    r = Record(struct=SIMPLE_SCHEMA.as_struct())
+    r[0] = id
+    r[1] = data
+    return r
+
+
+def _record_float(id: float, f: float) -> Record:  # pylint: disable=redefined-builtin
+    r = Record(struct=FLOAT_SCHEMA.as_struct())
+    r[0] = id
+    r[1] = f
+    return r
+
+
 def test_true() -> None:
     evaluate = expression_evaluator(SIMPLE_SCHEMA, AlwaysTrue(), case_sensitive=True)
     assert evaluate(Record(1, "a"))
@@ -131,16 +147,16 @@ def test_not_null() -> None:
 
 def test_is_nan() -> None:
     evaluate = expression_evaluator(FLOAT_SCHEMA, IsNaN("f"), case_sensitive=True)
-    assert not evaluate(Record(2, 0.0))
-    assert not evaluate(Record(3, float("infinity")))
-    assert evaluate(Record(4, float("nan")))
+    assert not evaluate(_record_float(2, f=0.0))
+    assert not evaluate(_record_float(3, f=float("infinity")))
+    assert evaluate(_record_float(4, f=float("nan")))
 
 
 def test_not_nan() -> None:
     evaluate = expression_evaluator(FLOAT_SCHEMA, NotNaN("f"), case_sensitive=True)
-    assert evaluate(Record(2, 0.0))
-    assert evaluate(Record(3, float("infinity")))
-    assert not evaluate(Record(4, float("nan")))
+    assert evaluate(_record_float(2, f=0.0))
+    assert evaluate(_record_float(3, f=float("infinity")))
+    assert not evaluate(_record_float(4, f=float("nan")))
 
 
 def test_not() -> None:
diff --git a/python/tests/expressions/test_expressions.py b/python/tests/expressions/test_expressions.py
index 7b26d45a28..5b82c14744 100644
--- a/python/tests/expressions/test_expressions.py
+++ b/python/tests/expressions/test_expressions.py
@@ -63,6 +63,9 @@ from pyiceberg.expressions.visitors import _from_byte_buffer
 from pyiceberg.schema import Accessor, Schema
 from pyiceberg.typedef import Record
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DecimalType,
     DoubleType,
     FloatType,
     IntegerType,
@@ -70,6 +73,8 @@ from pyiceberg.types import (
     LongType,
     NestedField,
     StringType,
+    StructType,
+    UUIDType,
 )
 from pyiceberg.utils.singleton import Singleton
 
@@ -605,22 +610,37 @@ def test_invert_always() -> None:
 def test_accessor_base_class() -> None:
     """Test retrieving a value at a position of a container using an accessor"""
 
-    struct = Record(*([None] * 12))
+    struct = Record(
+        struct=StructType(
+            NestedField(1, "a", StringType()),
+            NestedField(2, "b", StringType()),
+            NestedField(3, "c", StringType()),
+            NestedField(4, "d", IntegerType()),
+            NestedField(5, "e", IntegerType()),
+            NestedField(6, "f", IntegerType()),
+            NestedField(7, "g", FloatType()),
+            NestedField(8, "h", DecimalType(8, 4)),
+            NestedField(9, "i", UUIDType()),
+            NestedField(10, "j", BooleanType()),
+            NestedField(11, "k", BooleanType()),
+            NestedField(12, "l", BinaryType()),
+        )
+    )
 
     uuid_value = uuid.uuid4()
 
-    struct.set(0, "foo")
-    struct.set(1, "bar")
-    struct.set(2, "baz")
-    struct.set(3, 1)
-    struct.set(4, 2)
-    struct.set(5, 3)
-    struct.set(6, 1.234)
-    struct.set(7, Decimal("1.234"))
-    struct.set(8, uuid_value)
-    struct.set(9, True)
-    struct.set(10, False)
-    struct.set(11, b"\x19\x04\x9e?")
+    struct[0] = "foo"
+    struct[1] = "bar"
+    struct[2] = "baz"
+    struct[3] = 1
+    struct[4] = 2
+    struct[5] = 3
+    struct[6] = 1.234
+    struct[7] = Decimal("1.234")
+    struct[8] = uuid_value
+    struct[9] = True
+    struct[10] = False
+    struct[11] = b"\x19\x04\x9e?"
 
     assert Accessor(position=0).get(struct) == "foo"
     assert Accessor(position=1).get(struct) == "bar"
@@ -902,15 +922,15 @@ def test_less_than_or_equal() -> None:
 
 def test_bound_reference_eval(table_schema_simple: Schema) -> None:
     """Test creating a BoundReference and evaluating it on a StructProtocol"""
-    struct = Record(None, None, None, None)
+    struct = Record(struct=table_schema_simple.as_struct())
 
-    struct.set(pos=1, value="foovalue")
-    struct.set(pos=2, value=123)
-    struct.set(pos=3, value=True)
+    struct[0] = "foovalue"
+    struct[1] = 123
+    struct[2] = True
 
-    position1_accessor = Accessor(position=1)
-    position2_accessor = Accessor(position=2)
-    position3_accessor = Accessor(position=3)
+    position1_accessor = Accessor(position=0)
+    position2_accessor = Accessor(position=1)
+    position3_accessor = Accessor(position=2)
 
     field1 = table_schema_simple.find_field(1)
     field2 = table_schema_simple.find_field(2)
diff --git a/python/tests/expressions/test_visitors.py b/python/tests/expressions/test_visitors.py
index e79c353f81..13e4d01eea 100644
--- a/python/tests/expressions/test_visitors.py
+++ b/python/tests/expressions/test_visitors.py
@@ -786,12 +786,8 @@ def _to_byte_buffer(field_type: IcebergType, val: Any) -> bytes:
 
 
 def _to_manifest_file(*partitions: PartitionFieldSummary) -> ManifestFile:
-    return ManifestFile(
-        manifest_path="",
-        manifest_length=0,
-        partition_spec_id=0,
-        partitions=partitions,
-    )
+    """Helper to create a ManifestFile"""
+    return ManifestFile(manifest_path="", manifest_length=0, partition_spec_id=0, partitions=partitions)
 
 
 INT_MIN_VALUE = 30
diff --git a/python/tests/table/test_snapshots.py b/python/tests/table/test_snapshots.py
index 16ba83153e..b119ae9945 100644
--- a/python/tests/table/test_snapshots.py
+++ b/python/tests/table/test_snapshots.py
@@ -17,8 +17,6 @@
 # pylint:disable=redefined-outer-name,eval-used
 import pytest
 
-from pyiceberg.io.pyarrow import PyArrowFileIO
-from pyiceberg.manifest import ManifestContent, ManifestFile, PartitionFieldSummary
 from pyiceberg.table.snapshots import Operation, Snapshot, Summary
 
 
@@ -121,40 +119,3 @@ def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> No
         == """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)"""
     )
     assert snapshot_with_properties == eval(repr(snapshot_with_properties))
-
-
-def test_fetch_manifest_list(generated_manifest_file_file: str) -> None:
-    snapshot = Snapshot(
-        snapshot_id=25,
-        parent_snapshot_id=19,
-        sequence_number=200,
-        timestamp_ms=1602638573590,
-        manifest_list=generated_manifest_file_file,
-        summary=Summary(Operation.APPEND),
-        schema_id=3,
-    )
-    io = PyArrowFileIO()
-    actual = snapshot.manifests(io)
-    assert actual == [
-        ManifestFile(
-            manifest_path=actual[0].manifest_path,  # Is a temp path that changes every time
-            manifest_length=7989,
-            partition_spec_id=0,
-            content=ManifestContent.DATA,
-            sequence_number=0,
-            min_sequence_number=0,
-            added_snapshot_id=9182715666859759686,
-            added_data_files_count=3,
-            existing_data_files_count=0,
-            deleted_data_files_count=0,
-            added_rows_count=237993,
-            existing_rows_counts=None,
-            deleted_rows_count=0,
-            partitions=[
-                PartitionFieldSummary(
-                    contains_null=True, contains_nan=False, lower_bound=b"\x01\x00\x00\x00", upper_bound=b"\x02\x00\x00\x00"
-                )
-            ],
-            key_metadata=None,
-        )
-    ]
diff --git a/python/tests/test_schema.py b/python/tests/test_schema.py
index a6fba2cff0..4a59631acd 100644
--- a/python/tests/test_schema.py
+++ b/python/tests/test_schema.py
@@ -389,10 +389,10 @@ def test_build_position_accessors_with_struct(table_schema_nested: Schema) -> No
         def __init__(self, pos: Dict[int, Any] = EMPTY_DICT):
             self._pos: Dict[int, Any] = pos
 
-        def set(self, pos: int, value: Any) -> None:
+        def __setitem__(self, pos: int, value: Any) -> None:
             pass
 
-        def get(self, pos: int) -> Any:
+        def __getitem__(self, pos: int) -> Any:
             return self._pos[pos]
 
     accessors = build_position_accessors(table_schema_nested)
diff --git a/python/tests/test_transforms.py b/python/tests/test_transforms.py
index 72776b8574..8ba80e396b 100644
--- a/python/tests/test_transforms.py
+++ b/python/tests/test_transforms.py
@@ -62,6 +62,7 @@ from pyiceberg.transforms import (
     VoidTransform,
     YearTransform,
 )
+from pyiceberg.typedef import IcebergBaseModel
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -87,7 +88,6 @@ from pyiceberg.utils.datetime import (
     timestamp_to_micros,
     timestamptz_to_micros,
 )
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
 
 
 @pytest.mark.parametrize(
diff --git a/python/tests/test_typedef.py b/python/tests/test_typedef.py
index 628e674443..43388addca 100644
--- a/python/tests/test_typedef.py
+++ b/python/tests/test_typedef.py
@@ -16,7 +16,14 @@
 # under the License.
 import pytest
 
+from pyiceberg.schema import Schema
 from pyiceberg.typedef import FrozenDict, KeyDefaultDict, Record
+from pyiceberg.types import (
+    IntegerType,
+    NestedField,
+    StringType,
+    StructType,
+)
 
 
 def test_setitem_frozendict() -> None:
@@ -39,6 +46,44 @@ def test_keydefaultdict() -> None:
     assert defaultdict[22] == 1
 
 
-def test_record_repr() -> None:
-    r = Record(1, "vo", True)
-    assert repr(r) == "Record[1, 'vo', True]"
+def test_record_repr(table_schema_simple: Schema) -> None:
+    r = Record("vo", 1, True, struct=table_schema_simple.as_struct())
+    assert repr(r) == "Record[foo='vo', bar=1, baz=True]"
+
+
+def test_named_record() -> None:
+    r = Record(struct=StructType(NestedField(0, "id", IntegerType()), NestedField(1, "name", StringType())))
+
+    with pytest.raises(AttributeError):
+        assert r.id is None  # type: ignore
+
+    with pytest.raises(AttributeError):
+        assert r.name is None  # type: ignore
+
+    r[0] = 123
+    r[1] = "abc"
+
+    assert r[0] == 123
+    assert r[1] == "abc"
+
+    assert r.id == 123  # type: ignore
+    assert r.name == "abc"  # type: ignore
+
+
+def test_record_positional_args() -> None:
+    r = Record(1, "a", True)
+    assert repr(r) == "Record[field1=1, field2='a', field3=True]"
+
+
+def test_record_named_args() -> None:
+    r = Record(foo=1, bar="a", baz=True)
+
+    assert r.foo == 1  # type: ignore
+    assert r.bar == "a"  # type: ignore
+    assert r.baz is True  # type: ignore
+
+    assert r[0] == 1
+    assert r[1] == "a"
+    assert r[2] is True
+
+    assert repr(r) == "Record[foo=1, bar='a', baz=True]"
diff --git a/python/tests/test_types.py b/python/tests/test_types.py
index cda1309ede..e2c0272b45 100644
--- a/python/tests/test_types.py
+++ b/python/tests/test_types.py
@@ -20,6 +20,7 @@ from typing import Type
 import pytest
 from pydantic import ValidationError
 
+from pyiceberg.typedef import IcebergBaseModel
 from pyiceberg.types import (
     BinaryType,
     BooleanType,
@@ -42,7 +43,6 @@ from pyiceberg.types import (
     TimeType,
     UUIDType,
 )
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
 
 non_parameterized_types = [
     (1, BooleanType),
diff --git a/python/tests/utils/test_manifest.py b/python/tests/utils/test_manifest.py
index e78d0db237..548d94cb8f 100644
--- a/python/tests/utils/test_manifest.py
+++ b/python/tests/utils/test_manifest.py
@@ -19,12 +19,8 @@ from pyiceberg.io import load_file_io
 from pyiceberg.io.pyarrow import PyArrowFileIO
 from pyiceberg.manifest import (
     DataFile,
-    DataFileContent,
     FileFormat,
-    ManifestContent,
-    ManifestEntry,
     ManifestEntryStatus,
-    ManifestFile,
     PartitionFieldSummary,
     read_manifest_entry,
     read_manifest_list,
@@ -35,513 +31,152 @@ from pyiceberg.table.snapshots import Operation, Summary
 
 def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
     input_file = PyArrowFileIO().new_input(location=generated_manifest_entry_file)
+    manifest_entries = list(read_manifest_entry(input_file))
+    manifest_entry = manifest_entries[0]
+
+    assert manifest_entry.status == ManifestEntryStatus.ADDED
+    assert manifest_entry.snapshot_id == 8744736658442914487
+    assert manifest_entry.sequence_number is None
+    assert isinstance(manifest_entry.data_file, DataFile)
+
+    data_file = manifest_entry.data_file
+
+    assert data_file.content is None
     assert (
-        list(read_manifest_entry(input_file))
-        == [
-            ManifestEntry(
-                status=ManifestEntryStatus.ADDED,
-                snapshot_id=8744736658442914487,
-                sequence_number=None,
-                data_file=DataFile(
-                    content=DataFileContent.DATA,
-                    file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
-                    file_format=FileFormat.PARQUET,
-                    partition={"VendorID": 1, "tpep_pickup_datetime": 1925},
-                    record_count=19513,
-                    file_size_in_bytes=388872,
-                    block_size_in_bytes=67108864,
-                    column_sizes={
-                        1: 53,
-                        2: 98153,
-                        3: 98693,
-                        4: 53,
-                        5: 53,
-                        6: 53,
-                        7: 17425,
-                        8: 18528,
-                        9: 53,
-                        10: 44788,
-                        11: 35571,
-                        12: 53,
-                        13: 1243,
-                        14: 2355,
-                        15: 12750,
-                        16: 4029,
-                        17: 110,
-                        18: 47194,
-                        19: 2948,
-                    },
-                    value_counts={
-                        1: 19513,
-                        2: 19513,
-                        3: 19513,
-                        4: 19513,
-                        5: 19513,
-                        6: 19513,
-                        7: 19513,
-                        8: 19513,
-                        9: 19513,
-                        10: 19513,
-                        11: 19513,
-                        12: 19513,
-                        13: 19513,
-                        14: 19513,
-                        15: 19513,
-                        16: 19513,
-                        17: 19513,
-                        18: 19513,
-                        19: 19513,
-                    },
-                    null_value_counts={
-                        1: 19513,
-                        2: 0,
-                        3: 0,
-                        4: 19513,
-                        5: 19513,
-                        6: 19513,
-                        7: 0,
-                        8: 0,
-                        9: 19513,
-                        10: 0,
-                        11: 0,
-                        12: 19513,
-                        13: 0,
-                        14: 0,
-                        15: 0,
-                        16: 0,
-                        17: 0,
-                        18: 0,
-                        19: 0,
-                    },
-                    nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
-                    distinct_counts=None,
-                    lower_bounds={
-                        2: b"2020-04-01 00:00",
-                        3: b"2020-04-01 00:12",
-                        7: b"\x03\x00\x00\x00",
-                        8: b"\x01\x00\x00\x00",
-                        10: b"\xf6(\\\x8f\xc2\x05S\xc0",
-                        11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
-                        15: b")\\\x8f\xc2\xf5(\x08\xc0",
-                        16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
-                        19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
-                    },
-                    upper_bounds={
-                        2: b"2020-04-30 23:5:",
-                        3: b"2020-05-01 00:41",
-                        7: b"\t\x01\x00\x00",
-                        8: b"\t\x01\x00\x00",
-                        10: b"\xcd\xcc\xcc\xcc\xcc,_@",
-                        11: b"\x1f\x85\xebQ\\\xe2\xfe@",
-                        13: b"\x00\x00\x00\x00\x00\x00\x12@",
-                        14: b"\x00\x00\x00\x00\x00\x00\xe0?",
-                        15: b"q=\n\xd7\xa3\xf01@",
-                        16: b"\x00\x00\x00\x00\x00`B@",
-                        17: b"333333\xd3?",
-                        18: b"\x00\x00\x00\x00\x00\x18b@",
-                        19: b"\x00\x00\x00\x00\x00\x00\x04@",
-                    },
-                    key_metadata=None,
-                    split_offsets=[4],
-                    equality_ids=None,
-                    sort_order_id=0,
-                ),
-            ),
-            ManifestEntry(
-                status=ManifestEntryStatus.ADDED,
-                snapshot_id=8744736658442914487,
-                sequence_number=None,
-                data_file=DataFile(
-                    content=DataFileContent.DATA,
-                    file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet",
-                    file_format=FileFormat.PARQUET,
-                    partition={"VendorID": 1, "tpep_pickup_datetime": 1925},
-                    record_count=95050,
-                    file_size_in_bytes=1265950,
-                    block_size_in_bytes=67108864,
-                    column_sizes={
-                        1: 318,
-                        2: 329806,
-                        3: 331632,
-                        4: 15343,
-                        5: 2351,
-                        6: 3389,
-                        7: 71269,
-                        8: 76429,
-                        9: 16383,
-                        10: 86992,
-                        11: 89608,
-                        12: 265,
-                        13: 19377,
-                        14: 1692,
-                        15: 76162,
-                        16: 4354,
-                        17: 759,
-                        18: 120650,
-                        19: 11804,
-                    },
-                    value_counts={
-                        1: 95050,
-                        2: 95050,
-                        3: 95050,
-                        4: 95050,
-                        5: 95050,
-                        6: 95050,
-                        7: 95050,
-                        8: 95050,
-                        9: 95050,
-                        10: 95050,
-                        11: 95050,
-                        12: 95050,
-                        13: 95050,
-                        14: 95050,
-                        15: 95050,
-                        16: 95050,
-                        17: 95050,
-                        18: 95050,
-                        19: 95050,
-                    },
-                    null_value_counts={
-                        1: 0,
-                        2: 0,
-                        3: 0,
-                        4: 0,
-                        5: 0,
-                        6: 0,
-                        7: 0,
-                        8: 0,
-                        9: 0,
-                        10: 0,
-                        11: 0,
-                        12: 95050,
-                        13: 0,
-                        14: 0,
-                        15: 0,
-                        16: 0,
-                        17: 0,
-                        18: 0,
-                        19: 0,
-                    },
-                    nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
-                    distinct_counts=None,
-                    lower_bounds={
-                        1: b"\x01\x00\x00\x00",
-                        2: b"2020-04-01 00:00",
-                        3: b"2020-04-01 00:03",
-                        4: b"\x00\x00\x00\x00",
-                        5: b"\x01\x00\x00\x00",
-                        6: b"N",
-                        7: b"\x01\x00\x00\x00",
-                        8: b"\x01\x00\x00\x00",
-                        9: b"\x01\x00\x00\x00",
-                        10: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        14: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        15: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        18: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        19: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    },
-                    upper_bounds={
-                        1: b"\x01\x00\x00\x00",
-                        2: b"2020-04-30 23:5:",
-                        3: b"2020-05-01 00:1:",
-                        4: b"\x06\x00\x00\x00",
-                        5: b"c\x00\x00\x00",
-                        6: b"Y",
-                        7: b"\t\x01\x00\x00",
-                        8: b"\t\x01\x00\x00",
-                        9: b"\x04\x00\x00\x00",
-                        10: b"\\\x8f\xc2\xf5(8\x8c@",
-                        11: b"\xcd\xcc\xcc\xcc\xcc,f@",
-                        13: b"\x00\x00\x00\x00\x00\x00\x1c@",
-                        14: b"\x9a\x99\x99\x99\x99\x99\xf1?",
-                        15: b"\x00\x00\x00\x00\x00\x00Y@",
-                        16: b"\x00\x00\x00\x00\x00\xb0X@",
-                        17: b"333333\xd3?",
-                        18: b"\xc3\xf5(\\\x8f:\x8c@",
-                        19: b"\x00\x00\x00\x00\x00\x00\x04@",
-                    },
-                    key_metadata=None,
-                    split_offsets=[4],
-                    equality_ids=None,
-                    sort_order_id=0,
-                ),
-            ),
-        ]
-        != [
-            ManifestEntry(
-                status=ManifestEntryStatus.ADDED,
-                snapshot_id=8744736658442914487,
-                sequence_number=None,
-                data_file=DataFile(
-                    content=DataFileContent.DATA,
-                    file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
-                    file_format=FileFormat.PARQUET,
-                    partition={"VendorID": 1, "tpep_pickup_datetime": None},
-                    record_count=19513,
-                    file_size_in_bytes=388872,
-                    block_size_in_bytes=67108864,
-                    column_sizes={
-                        1: 53,
-                        2: 98153,
-                        3: 98693,
-                        4: 53,
-                        5: 53,
-                        6: 53,
-                        7: 17425,
-                        8: 18528,
-                        9: 53,
-                        10: 44788,
-                        11: 35571,
-                        12: 53,
-                        13: 1243,
-                        14: 2355,
-                        15: 12750,
-                        16: 4029,
-                        17: 110,
-                        18: 47194,
-                        19: 2948,
-                    },
-                    value_counts={
-                        1: 19513,
-                        2: 19513,
-                        3: 19513,
-                        4: 19513,
-                        5: 19513,
-                        6: 19513,
-                        7: 19513,
-                        8: 19513,
-                        9: 19513,
-                        10: 19513,
-                        11: 19513,
-                        12: 19513,
-                        13: 19513,
-                        14: 19513,
-                        15: 19513,
-                        16: 19513,
-                        17: 19513,
-                        18: 19513,
-                        19: 19513,
-                    },
-                    null_value_counts={
-                        1: 19513,
-                        2: 0,
-                        3: 0,
-                        4: 19513,
-                        5: 19513,
-                        6: 19513,
-                        7: 0,
-                        8: 0,
-                        9: 19513,
-                        10: 0,
-                        11: 0,
-                        12: 19513,
-                        13: 0,
-                        14: 0,
-                        15: 0,
-                        16: 0,
-                        17: 0,
-                        18: 0,
-                        19: 0,
-                    },
-                    nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
-                    distinct_counts=None,
-                    lower_bounds={
-                        2: b"2020-04-01 00:00",
-                        3: b"2020-04-01 00:12",
-                        7: b"\x03\x00\x00\x00",
-                        8: b"\x01\x00\x00\x00",
-                        10: b"\xf6(\\\x8f\xc2\x05S\xc0",
-                        11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
-                        15: b")\\\x8f\xc2\xf5(\x08\xc0",
-                        16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
-                        19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
-                    },
-                    upper_bounds={
-                        2: b"2020-04-30 23:5:",
-                        3: b"2020-05-01 00:41",
-                        7: b"\t\x01\x00\x00",
-                        8: b"\t\x01\x00\x00",
-                        10: b"\xcd\xcc\xcc\xcc\xcc,_@",
-                        11: b"\x1f\x85\xebQ\\\xe2\xfe@",
-                        13: b"\x00\x00\x00\x00\x00\x00\x12@",
-                        14: b"\x00\x00\x00\x00\x00\x00\xe0?",
-                        15: b"q=\n\xd7\xa3\xf01@",
-                        16: b"\x00\x00\x00\x00\x00`B@",
-                        17: b"333333\xd3?",
-                        18: b"\x00\x00\x00\x00\x00\x18b@",
-                        19: b"\x00\x00\x00\x00\x00\x00\x04@",
-                    },
-                    key_metadata=None,
-                    split_offsets=[4],
-                    equality_ids=None,
-                    sort_order_id=0,
-                ),
-            ),
-            ManifestEntry(
-                status=ManifestEntryStatus.ADDED,
-                snapshot_id=8744736658442914487,
-                sequence_number=None,
-                data_file=DataFile(
-                    content=DataFileContent.DATA,
-                    file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet",
-                    file_format=FileFormat.PARQUET,
-                    partition={"VendorID": 1, "tpep_pickup_datetime": 1925},
-                    record_count=95050,
-                    file_size_in_bytes=1265950,
-                    block_size_in_bytes=67108864,
-                    column_sizes={
-                        1: 318,
-                        2: 329806,
-                        3: 331632,
-                        4: 15343,
-                        5: 2351,
-                        6: 3389,
-                        7: 71269,
-                        8: 76429,
-                        9: 16383,
-                        10: 86992,
-                        11: 89608,
-                        12: 265,
-                        13: 19377,
-                        14: 1692,
-                        15: 76162,
-                        16: 4354,
-                        17: 759,
-                        18: 120650,
-                        19: 11804,
-                    },
-                    value_counts={
-                        1: 95050,
-                        2: 95050,
-                        3: 95050,
-                        4: 95050,
-                        5: 95050,
-                        6: 95050,
-                        7: 95050,
-                        8: 95050,
-                        9: 95050,
-                        10: 95050,
-                        11: 95050,
-                        12: 95050,
-                        13: 95050,
-                        14: 95050,
-                        15: 95050,
-                        16: 95050,
-                        17: 95050,
-                        18: 95050,
-                        19: 95050,
-                    },
-                    null_value_counts={
-                        1: 0,
-                        2: 0,
-                        3: 0,
-                        4: 0,
-                        5: 0,
-                        6: 0,
-                        7: 0,
-                        8: 0,
-                        9: 0,
-                        10: 0,
-                        11: 0,
-                        12: 95050,
-                        13: 0,
-                        14: 0,
-                        15: 0,
-                        16: 0,
-                        17: 0,
-                        18: 0,
-                        19: 0,
-                    },
-                    nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
-                    distinct_counts=None,
-                    lower_bounds={
-                        1: b"\x01\x00\x00\x00",
-                        2: b"2020-04-01 00:00",
-                        3: b"2020-04-01 00:03",
-                        4: b"\x00\x00\x00\x00",
-                        5: b"\x01\x00\x00\x00",
-                        6: b"N",
-                        7: b"\x01\x00\x00\x00",
-                        8: b"\x01\x00\x00\x00",
-                        9: b"\x01\x00\x00\x00",
-                        10: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        14: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        15: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        18: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                        19: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    },
-                    upper_bounds={
-                        1: b"\x01\x00\x00\x00",
-                        2: b"2020-04-30 23:5:",
-                        3: b"2020-05-01 00:1:",
-                        4: b"\x06\x00\x00\x00",
-                        5: b"c\x00\x00\x00",
-                        6: b"Y",
-                        7: b"\t\x01\x00\x00",
-                        8: b"\t\x01\x00\x00",
-                        9: b"\x04\x00\x00\x00",
-                        10: b"\\\x8f\xc2\xf5(8\x8c@",
-                        11: b"\xcd\xcc\xcc\xcc\xcc,f@",
-                        13: b"\x00\x00\x00\x00\x00\x00\x1c@",
-                        14: b"\x9a\x99\x99\x99\x99\x99\xf1?",
-                        15: b"\x00\x00\x00\x00\x00\x00Y@",
-                        16: b"\x00\x00\x00\x00\x00\xb0X@",
-                        17: b"333333\xd3?",
-                        18: b"\xc3\xf5(\\\x8f:\x8c@",
-                        19: b"\x00\x00\x00\x00\x00\x00\x04@",
-                    },
-                    key_metadata=None,
-                    split_offsets=[4],
-                    equality_ids=None,
-                    sort_order_id=0,
-                ),
-            ),
-        ]
+        data_file.file_path
+        == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
     )
+    assert data_file.file_format == FileFormat.PARQUET
+    assert repr(data_file.partition) == "Record[VendorID=1, tpep_pickup_datetime=1925]"
+    assert data_file.record_count == 19513
+    assert data_file.file_size_in_bytes == 388872
+    assert data_file.column_sizes == {
+        1: 53,
+        2: 98153,
+        3: 98693,
+        4: 53,
+        5: 53,
+        6: 53,
+        7: 17425,
+        8: 18528,
+        9: 53,
+        10: 44788,
+        11: 35571,
+        12: 53,
+        13: 1243,
+        14: 2355,
+        15: 12750,
+        16: 4029,
+        17: 110,
+        18: 47194,
+        19: 2948,
+    }
+    assert data_file.value_counts == {
+        1: 19513,
+        2: 19513,
+        3: 19513,
+        4: 19513,
+        5: 19513,
+        6: 19513,
+        7: 19513,
+        8: 19513,
+        9: 19513,
+        10: 19513,
+        11: 19513,
+        12: 19513,
+        13: 19513,
+        14: 19513,
+        15: 19513,
+        16: 19513,
+        17: 19513,
+        18: 19513,
+        19: 19513,
+    }
+    assert data_file.null_value_counts == {
+        1: 19513,
+        2: 0,
+        3: 0,
+        4: 19513,
+        5: 19513,
+        6: 19513,
+        7: 0,
+        8: 0,
+        9: 19513,
+        10: 0,
+        11: 0,
+        12: 19513,
+        13: 0,
+        14: 0,
+        15: 0,
+        16: 0,
+        17: 0,
+        18: 0,
+        19: 0,
+    }
+    assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0}
+    assert data_file.lower_bounds == {
+        2: b"2020-04-01 00:00",
+        3: b"2020-04-01 00:12",
+        7: b"\x03\x00\x00\x00",
+        8: b"\x01\x00\x00\x00",
+        10: b"\xf6(\\\x8f\xc2\x05S\xc0",
+        11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+        13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+        14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
+        15: b")\\\x8f\xc2\xf5(\x08\xc0",
+        16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+        17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+        18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
+        19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
+    }
+    assert data_file.upper_bounds == {
+        2: b"2020-04-30 23:5:",
+        3: b"2020-05-01 00:41",
+        7: b"\t\x01\x00\x00",
+        8: b"\t\x01\x00\x00",
+        10: b"\xcd\xcc\xcc\xcc\xcc,_@",
+        11: b"\x1f\x85\xebQ\\\xe2\xfe@",
+        13: b"\x00\x00\x00\x00\x00\x00\x12@",
+        14: b"\x00\x00\x00\x00\x00\x00\xe0?",
+        15: b"q=\n\xd7\xa3\xf01@",
+        16: b"\x00\x00\x00\x00\x00`B@",
+        17: b"333333\xd3?",
+        18: b"\x00\x00\x00\x00\x00\x18b@",
+        19: b"\x00\x00\x00\x00\x00\x00\x04@",
+    }
+    assert data_file.key_metadata is None
+    assert data_file.split_offsets == [4]
+    assert data_file.equality_ids is None
+    assert data_file.sort_order_id == 0
 
 
 def test_read_manifest_list(generated_manifest_file_file: str) -> None:
     input_file = PyArrowFileIO().new_input(generated_manifest_file_file)
-    actual = list(read_manifest_list(input_file))
-    expected = [
-        ManifestFile(
-            manifest_path=actual[0].manifest_path,
-            manifest_length=7989,
-            partition_spec_id=0,
-            added_snapshot_id=9182715666859759686,
-            added_data_files_count=3,
-            existing_data_files_count=0,
-            deleted_data_files_count=0,
-            partitions=[
-                PartitionFieldSummary(
-                    contains_null=True, contains_nan=False, lower_bound=b"\x01\x00\x00\x00", upper_bound=b"\x02\x00\x00\x00"
-                )
-            ],
-            added_rows_count=237993,
-            existing_rows_counts=None,
-            deleted_rows_count=0,
-        )
-    ]
-    assert actual == expected
+    manifest_list = list(read_manifest_list(input_file))[0]
+
+    assert manifest_list.manifest_length == 7989
+    assert manifest_list.partition_spec_id == 0
+    assert manifest_list.added_snapshot_id == 9182715666859759686
+    assert manifest_list.added_files_count == 3
+    assert manifest_list.existing_files_count == 0
+    assert manifest_list.deleted_files_count == 0
+
+    assert isinstance(manifest_list.partitions, list)
+
+    partitions_summary = manifest_list.partitions[0]
+    assert isinstance(partitions_summary, PartitionFieldSummary)
 
+    assert partitions_summary.contains_null is True
+    assert partitions_summary.contains_nan is False
+    assert partitions_summary.lower_bound == b"\x01\x00\x00\x00"
+    assert partitions_summary.upper_bound == b"\x02\x00\x00\x00"
 
-def test_read_manifest(generated_manifest_file_file: str, generated_manifest_entry_file: str) -> None:
+    assert manifest_list.added_rows_count == 237993
+    assert manifest_list.existing_rows_count == 0
+    assert manifest_list.deleted_rows_count == 0
+
+
+def test_read_manifest(generated_manifest_file_file: str) -> None:
     io = load_file_io({})
 
     snapshot = Snapshot(
@@ -552,269 +187,29 @@ def test_read_manifest(generated_manifest_file_file: str, generated_manifest_ent
         summary=Summary(Operation.APPEND),
         schema_id=3,
     )
-    manifest_list = snapshot.manifests(io)
+    manifest_list = snapshot.manifests(io)[0]
+
+    assert manifest_list.manifest_length == 7989
+    assert manifest_list.partition_spec_id == 0
+    assert manifest_list.content is None
+    assert manifest_list.sequence_number is None
+    assert manifest_list.min_sequence_number is None
+    assert manifest_list.added_snapshot_id == 9182715666859759686
+    assert manifest_list.added_files_count == 3
+    assert manifest_list.existing_files_count == 0
+    assert manifest_list.deleted_files_count == 0
+    assert manifest_list.added_rows_count == 237993
+    assert manifest_list.existing_rows_count == 0
+    assert manifest_list.deleted_rows_count == 0
+    assert manifest_list.key_metadata is None
+
+    assert isinstance(manifest_list.partitions, list)
+
+    partition = manifest_list.partitions[0]
 
-    assert manifest_list == [
-        ManifestFile(
-            manifest_path=generated_manifest_entry_file,
-            manifest_length=7989,
-            partition_spec_id=0,
-            content=ManifestContent.DATA,
-            sequence_number=0,
-            min_sequence_number=0,
-            added_snapshot_id=9182715666859759686,
-            added_data_files_count=3,
-            existing_data_files_count=0,
-            deleted_data_files_count=0,
-            added_rows_count=237993,
-            existing_rows_counts=None,
-            deleted_rows_count=0,
-            partitions=[
-                PartitionFieldSummary(
-                    contains_null=True, contains_nan=False, lower_bound=b"\x01\x00\x00\x00", upper_bound=b"\x02\x00\x00\x00"
-                )
-            ],
-            key_metadata=None,
-        )
-    ]
-    actual = manifest_list[0].fetch_manifest_entry(io)
-    expected = [
-        ManifestEntry(
-            status=ManifestEntryStatus.ADDED,
-            snapshot_id=8744736658442914487,
-            sequence_number=None,
-            data_file=DataFile(
-                content=DataFileContent.DATA,
-                file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
-                file_format=FileFormat.PARQUET,
-                partition={"VendorID": 1, "tpep_pickup_datetime": 1925},
-                record_count=19513,
-                file_size_in_bytes=388872,
-                block_size_in_bytes=67108864,
-                column_sizes={
-                    1: 53,
-                    2: 98153,
-                    3: 98693,
-                    4: 53,
-                    5: 53,
-                    6: 53,
-                    7: 17425,
-                    8: 18528,
-                    9: 53,
-                    10: 44788,
-                    11: 35571,
-                    12: 53,
-                    13: 1243,
-                    14: 2355,
-                    15: 12750,
-                    16: 4029,
-                    17: 110,
-                    18: 47194,
-                    19: 2948,
-                },
-                value_counts={
-                    1: 19513,
-                    2: 19513,
-                    3: 19513,
-                    4: 19513,
-                    5: 19513,
-                    6: 19513,
-                    7: 19513,
-                    8: 19513,
-                    9: 19513,
-                    10: 19513,
-                    11: 19513,
-                    12: 19513,
-                    13: 19513,
-                    14: 19513,
-                    15: 19513,
-                    16: 19513,
-                    17: 19513,
-                    18: 19513,
-                    19: 19513,
-                },
-                null_value_counts={
-                    1: 19513,
-                    2: 0,
-                    3: 0,
-                    4: 19513,
-                    5: 19513,
-                    6: 19513,
-                    7: 0,
-                    8: 0,
-                    9: 19513,
-                    10: 0,
-                    11: 0,
-                    12: 19513,
-                    13: 0,
-                    14: 0,
-                    15: 0,
-                    16: 0,
-                    17: 0,
-                    18: 0,
-                    19: 0,
-                },
-                nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
-                distinct_counts=None,
-                lower_bounds={
-                    2: b"2020-04-01 00:00",
-                    3: b"2020-04-01 00:12",
-                    7: b"\x03\x00\x00\x00",
-                    8: b"\x01\x00\x00\x00",
-                    10: b"\xf6(\\\x8f\xc2\x05S\xc0",
-                    11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
-                    15: b")\\\x8f\xc2\xf5(\x08\xc0",
-                    16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
-                    19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
-                },
-                upper_bounds={
-                    2: b"2020-04-30 23:5:",
-                    3: b"2020-05-01 00:41",
-                    7: b"\t\x01\x00\x00",
-                    8: b"\t\x01\x00\x00",
-                    10: b"\xcd\xcc\xcc\xcc\xcc,_@",
-                    11: b"\x1f\x85\xebQ\\\xe2\xfe@",
-                    13: b"\x00\x00\x00\x00\x00\x00\x12@",
-                    14: b"\x00\x00\x00\x00\x00\x00\xe0?",
-                    15: b"q=\n\xd7\xa3\xf01@",
-                    16: b"\x00\x00\x00\x00\x00`B@",
-                    17: b"333333\xd3?",
-                    18: b"\x00\x00\x00\x00\x00\x18b@",
-                    19: b"\x00\x00\x00\x00\x00\x00\x04@",
-                },
-                key_metadata=None,
-                split_offsets=[4],
-                equality_ids=None,
-                sort_order_id=0,
-            ),
-        ),
-        ManifestEntry(
-            status=ManifestEntryStatus.ADDED,
-            snapshot_id=8744736658442914487,
-            sequence_number=None,
-            data_file=DataFile(
-                content=DataFileContent.DATA,
-                file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet",
-                file_format=FileFormat.PARQUET,
-                partition={"VendorID": 1, "tpep_pickup_datetime": 1925},
-                record_count=95050,
-                file_size_in_bytes=1265950,
-                block_size_in_bytes=67108864,
-                column_sizes={
-                    1: 318,
-                    2: 329806,
-                    3: 331632,
-                    4: 15343,
-                    5: 2351,
-                    6: 3389,
-                    7: 71269,
-                    8: 76429,
-                    9: 16383,
-                    10: 86992,
-                    11: 89608,
-                    12: 265,
-                    13: 19377,
-                    14: 1692,
-                    15: 76162,
-                    16: 4354,
-                    17: 759,
-                    18: 120650,
-                    19: 11804,
-                },
-                value_counts={
-                    1: 95050,
-                    2: 95050,
-                    3: 95050,
-                    4: 95050,
-                    5: 95050,
-                    6: 95050,
-                    7: 95050,
-                    8: 95050,
-                    9: 95050,
-                    10: 95050,
-                    11: 95050,
-                    12: 95050,
-                    13: 95050,
-                    14: 95050,
-                    15: 95050,
-                    16: 95050,
-                    17: 95050,
-                    18: 95050,
-                    19: 95050,
-                },
-                null_value_counts={
-                    1: 0,
-                    2: 0,
-                    3: 0,
-                    4: 0,
-                    5: 0,
-                    6: 0,
-                    7: 0,
-                    8: 0,
-                    9: 0,
-                    10: 0,
-                    11: 0,
-                    12: 95050,
-                    13: 0,
-                    14: 0,
-                    15: 0,
-                    16: 0,
-                    17: 0,
-                    18: 0,
-                    19: 0,
-                },
-                nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
-                distinct_counts=None,
-                lower_bounds={
-                    1: b"\x01\x00\x00\x00",
-                    2: b"2020-04-01 00:00",
-                    3: b"2020-04-01 00:03",
-                    4: b"\x00\x00\x00\x00",
-                    5: b"\x01\x00\x00\x00",
-                    6: b"N",
-                    7: b"\x01\x00\x00\x00",
-                    8: b"\x01\x00\x00\x00",
-                    9: b"\x01\x00\x00\x00",
-                    10: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    14: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    15: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    18: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                    19: b"\x00\x00\x00\x00\x00\x00\x00\x00",
-                },
-                upper_bounds={
-                    1: b"\x01\x00\x00\x00",
-                    2: b"2020-04-30 23:5:",
-                    3: b"2020-05-01 00:1:",
-                    4: b"\x06\x00\x00\x00",
-                    5: b"c\x00\x00\x00",
-                    6: b"Y",
-                    7: b"\t\x01\x00\x00",
-                    8: b"\t\x01\x00\x00",
-                    9: b"\x04\x00\x00\x00",
-                    10: b"\\\x8f\xc2\xf5(8\x8c@",
-                    11: b"\xcd\xcc\xcc\xcc\xcc,f@",
-                    13: b"\x00\x00\x00\x00\x00\x00\x1c@",
-                    14: b"\x9a\x99\x99\x99\x99\x99\xf1?",
-                    15: b"\x00\x00\x00\x00\x00\x00Y@",
-                    16: b"\x00\x00\x00\x00\x00\xb0X@",
-                    17: b"333333\xd3?",
-                    18: b"\xc3\xf5(\\\x8f:\x8c@",
-                    19: b"\x00\x00\x00\x00\x00\x00\x04@",
-                },
-                key_metadata=None,
-                split_offsets=[4],
-                equality_ids=None,
-                sort_order_id=0,
-            ),
-        ),
-    ]
+    assert isinstance(partition, PartitionFieldSummary)
 
-    assert actual == expected
+    assert partition.contains_null is True
+    assert partition.contains_nan is False
+    assert partition.lower_bound == b"\x01\x00\x00\x00"
+    assert partition.upper_bound == b"\x02\x00\x00\x00"