You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/02 10:45:13 UTC

[GitHub] [iceberg] Fokko commented on a diff in pull request #6506: Python: Refactor Avro read path to use a partner visitor

Fokko commented on code in PR #6506:
URL: https://github.com/apache/iceberg/pull/6506#discussion_r1059894457


##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type

Review Comment:
   Looks like we don't raise this one anymore.



##########
python/pyiceberg/avro/reader.py:
##########
@@ -260,25 +238,43 @@ def skip(self, decoder: BinaryDecoder) -> None:
             return self.option.skip(decoder)
 
 
-@dataclass(frozen=True)
-class StructReader(Reader):
-    fields: Tuple[Tuple[Optional[int], Reader], ...] = dataclassfield()
+class StructProtocolReader(Reader):
+    create_struct: Callable[[], StructProtocol]
+    fields: Tuple[Tuple[Optional[int], Reader], ...]
+
+    def __init__(self, fields: Tuple[Tuple[Optional[int], Reader], ...], create_struct: Callable[[], StructProtocol]):
+        self.create_struct = create_struct
+        self.fields = fields
+
+    def create_or_reuse(self, reuse: Optional[StructProtocol]) -> StructProtocol:
+        if reuse:
+            return reuse
+        else:
+            return self.create_struct()
+
+    def read(self, decoder: BinaryDecoder) -> Any:
+        struct = self.create_or_reuse(None)
 
-    def read(self, decoder: BinaryDecoder) -> Record:
-        result: List[Union[Any, StructProtocol]] = [None] * len(self.fields)
         for (pos, field) in self.fields:
             if pos is not None:
-                result[pos] = field.read(decoder)
+                struct.set(pos, field.read(decoder))  # later: pass reuse in here
             else:
                 field.skip(decoder)
 
-        return Record(*result)
+        return struct
 
     def skip(self, decoder: BinaryDecoder) -> None:
         for _, field in self.fields:
             field.skip(decoder)
 
 
+class StructReader(StructProtocolReader):
+    fields: Tuple[Tuple[Optional[int], Reader], ...]

Review Comment:
   Should extract the `Tuple[Optional[int], Reader]` into a named type?



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    return resolve(file_schema, file_schema)
+
 
-    The function traverses the schema in post-order fashion
+def resolve(
+    file_schema: Union[Schema, IcebergType],
+    read_schema: Union[Schema, IcebergType],
+    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+    """Resolves the file and read schema to produce a reader
 
-     Args:
-         file_schema (Schema | IcebergType): The schema of the Avro file
-         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+        read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+        read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of types to use for struct data
 
-     Raises:
-         NotImplementedError: If attempting to resolve an unrecognized object type
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type

Review Comment:
   Looks like we don't raise this one anymore.



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    return resolve(file_schema, file_schema)
+
 
-    The function traverses the schema in post-order fashion
+def resolve(
+    file_schema: Union[Schema, IcebergType],
+    read_schema: Union[Schema, IcebergType],
+    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+    """Resolves the file and read schema to produce a reader
 
-     Args:
-         file_schema (Schema | IcebergType): The schema of the Avro file
-         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+        read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+        read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of types to use for struct data
 
-     Raises:
-         NotImplementedError: If attempting to resolve an unrecognized object type
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
     """
-    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+    return visit_with_partner(file_schema, read_schema, SchemaResolver(read_types), SchemaPartnerAccessor())  # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+    read_types: Dict[int, Callable[[Schema], StructProtocol]]
+    field_ids: List[int]
+
+    def before_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.pop()
+
+    def create_struct_reader(self, read_schema: StructType, field_readers: Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+        current_field_id = self.field_ids[-1] if self.field_ids else -1
+        if constructor := self.read_types.get(current_field_id):
+            return StructProtocolReader(field_readers, partial(constructor, read_schema))
+
+        return StructReader(field_readers)
+
+    def __init__(self, read_types: Dict[int, Callable[[Schema], StructProtocol]]):
+        self.read_types = read_types
+        self.field_ids = []
+
+    def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader:
+        return result
+
+    def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+        if not expected_struct:
+            # no values are expected so the reader will only be used for skipping
+            return StructReader(tuple(enumerate(field_readers)))
+
+        if not isinstance(expected_struct, StructType):
+            raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}")
+
+        results: List[Tuple[Optional[int], Reader]] = []
+        expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)}
+
+        # first, add readers for the file fields that must be in order
+        for field, result_reader in zip(struct.fields, field_readers):
+            read_pos = expected_positions.get(field.field_id)
+            results.append((read_pos, result_reader))
+
+        file_fields = {field.field_id: field for field in struct.fields}
+        for pos, read_field in enumerate(expected_struct.fields):
+            if read_field.field_id not in file_fields:
+                if read_field.required:
+                    raise ResolveError(f"{read_field} is non-optional, and not part of the file schema")
+                # Just set the new field to None
+                results.append((pos, NoneReader()))
+
+        return self.create_struct_reader(expected_struct, tuple(results))
+
+    def field(self, field: NestedField, expected_field: Optional[IcebergType], field_reader: Reader) -> Reader:
+        return field_reader if field.required else OptionReader(field_reader)
+
+    def list(self, list_type: ListType, expected_list: Optional[IcebergType], element_reader: Reader) -> Reader:
+        if expected_list and not isinstance(expected_list, ListType):
+            raise ResolveError(f"File/read schema are not aligned for list, got {expected_list}")
+
+        return ListReader(element_reader if list_type.element_required else OptionReader(element_reader))
+
+    def map(self, map_type: MapType, expected_map: Optional[IcebergType], key_reader: Reader, value_reader: Reader) -> Reader:
+        if expected_map and not isinstance(expected_map, MapType):
+            raise ResolveError(f"File/read schema are not aligned for map, got {expected_map}")
 
+        return MapReader(key_reader, value_reader if map_type.value_required else OptionReader(value_reader))
 
-@resolve.register(Schema)
-def _(file_schema: Schema, read_schema: Schema) -> Reader:
-    """Visit a Schema and starts resolving it by converting it to a struct"""
-    return resolve(file_schema.as_struct(), read_schema.as_struct())
+    def primitive(self, primitive: PrimitiveType, expected_primitive: Optional[IcebergType]) -> Reader:
+        if expected_primitive is not None:
+            if not isinstance(expected_primitive, PrimitiveType):
+                raise ResolveError(f"File/read schema are not aligned for {primitive}, got {expected_primitive}")
 
+            # ensure that the type can be projected to the expected
+            if primitive != expected_primitive:
+                promote(primitive, expected_primitive)
 
-@resolve.register(StructType)
-def _(file_struct: StructType, read_struct: IcebergType) -> Reader:
-    """Iterates over the file schema, and checks if the field is in the read schema"""
+        return super().primitive(primitive, expected_primitive)
 
-    if not isinstance(read_struct, StructType):
-        raise ResolveError(f"File/read schema are not aligned for {file_struct}, got {read_struct}")
+    def visit_boolean(self, boolean_type: BooleanType, partner: Optional[IcebergType]) -> Reader:
+        return BooleanReader()
 
-    results: List[Tuple[Optional[int], Reader]] = []
-    read_fields = {field.field_id: (pos, field) for pos, field in enumerate(read_struct.fields)}
+    def visit_integer(self, integer_type: IntegerType, partner: Optional[IcebergType]) -> Reader:
+        return IntegerReader()
 
-    for file_field in file_struct.fields:
-        if file_field.field_id in read_fields:
-            read_pos, read_field = read_fields[file_field.field_id]
-            result_reader = resolve(file_field.field_type, read_field.field_type)
+    def visit_long(self, long_type: LongType, partner: Optional[IcebergType]) -> Reader:
+        return IntegerReader()
+
+    def visit_float(self, float_type: FloatType, partner: Optional[IcebergType]) -> Reader:
+        return FloatReader()
+
+    def visit_double(self, double_type: DoubleType, partner: Optional[IcebergType]) -> Reader:
+        return DoubleReader()
+
+    def visit_decimal(self, decimal_type: DecimalType, partner: Optional[IcebergType]) -> Reader:
+        return DecimalReader(decimal_type.precision, decimal_type.scale)
+
+    def visit_date(self, date_type: DateType, partner: Optional[IcebergType]) -> Reader:
+        return DateReader()
+
+    def visit_time(self, time_type: TimeType, partner: Optional[IcebergType]) -> Reader:
+        return TimeReader()
+
+    def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[IcebergType]) -> Reader:
+        return TimestampReader()
+
+    def visit_timestampz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Reader:
+        return TimestamptzReader()
+
+    def visit_string(self, string_type: StringType, partner: Optional[IcebergType]) -> Reader:
+        return StringReader()
+
+    def visit_uuid(self, uuid_type: UUIDType, partner: Optional[IcebergType]) -> Reader:
+        return UUIDReader()
+
+    def visit_fixed(self, fixed_type: FixedType, partner: Optional[IcebergType]) -> Reader:
+        return FixedReader(len(fixed_type))
+
+    def visit_binary(self, binary_type: BinaryType, partner: Optional[IcebergType]) -> Reader:
+        return BinaryReader()
+
+
+class SchemaPartnerAccessor(PartnerAccessor[IcebergType]):
+    def schema_partner(self, partner: Optional[IcebergType]) -> Optional[IcebergType]:
+        if isinstance(partner, Schema):
+            return partner.as_struct()
+
+        raise ResolveError(f"File/read schema are not aligned for schema, got {partner}")
+
+    def field_partner(self, partner: Optional[IcebergType], field_id: int, field_name: str) -> Optional[IcebergType]:
+        if isinstance(partner, StructType):
+            field = partner.field(field_id)
         else:
-            read_pos = None
-            result_reader = visit(file_field.field_type, ConstructReader())
-        result_reader = result_reader if file_field.required else OptionReader(result_reader)
-        results.append((read_pos, result_reader))
-
-    file_fields = {field.field_id: field for field in file_struct.fields}
-    for pos, read_field in enumerate(read_struct.fields):
-        if read_field.field_id not in file_fields:
-            if read_field.required:
-                raise ResolveError(f"{read_field} is non-optional, and not part of the file schema")
-            # Just set the new field to None
-            results.append((pos, NoneReader()))
-
-    return StructReader(tuple(results))
-
-
-@resolve.register(ListType)
-def _(file_list: ListType, read_list: IcebergType) -> Reader:
-    if not isinstance(read_list, ListType):
-        raise ResolveError(f"File/read schema are not aligned for {file_list}, got {read_list}")
-    element_reader = resolve(file_list.element_type, read_list.element_type)
-    return ListReader(element_reader)
-
-
-@resolve.register(MapType)
-def _(file_map: MapType, read_map: IcebergType) -> Reader:
-    if not isinstance(read_map, MapType):
-        raise ResolveError(f"File/read schema are not aligned for {file_map}, got {read_map}")
-    key_reader = resolve(file_map.key_type, read_map.key_type)
-    value_reader = resolve(file_map.value_type, read_map.value_type)
-
-    return MapReader(key_reader, value_reader)
-
-
-@resolve.register(FloatType)
-def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
-    """This is a special case, when we need to adhere to the bytes written"""
-    if isinstance(read_type, DoubleType):
-        return visit(file_type, ConstructReader())
-    else:
-        raise ResolveError(f"Cannot promote an float to {read_type}")
-
-
-@resolve.register(PrimitiveType)
-def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
-    """Converting the primitive type into an actual reader that will decode the physical data"""
-    if not isinstance(read_type, PrimitiveType):
-        raise ResolveError(f"Cannot promote {file_type} to {read_type}")
-
-    # In the case of a promotion, we want to check if it is valid
-    if file_type != read_type:
-        read_type = promote(file_type, read_type)
-    return visit(read_type, ConstructReader())
+            raise ResolveError(f"File/read schema are not aligned for struct, got {partner}")
+
+        return field.field_type if field else None
+
+    def list_element_partner(self, partner_list: Optional[IcebergType]) -> Optional[IcebergType]:
+        if isinstance(partner_list, ListType):
+            return partner_list.element_type
+
+        raise ResolveError(f"File/read schema are not aligned for list, got {partner_list}")
+
+    def map_key_partner(self, partner_map: Optional[IcebergType]) -> Optional[IcebergType]:
+        if isinstance(partner_map, MapType):
+            return partner_map.key_type
+
+        raise ResolveError(f"File/read schema are not aligned for map, got {partner_map}")

Review Comment:
   ```suggestion
           raise ResolveError(f"File/read schema are not aligned for map key, got {partner_map}")
   ```



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    return resolve(file_schema, file_schema)
+
 
-    The function traverses the schema in post-order fashion
+def resolve(
+    file_schema: Union[Schema, IcebergType],
+    read_schema: Union[Schema, IcebergType],
+    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+    """Resolves the file and read schema to produce a reader
 
-     Args:
-         file_schema (Schema | IcebergType): The schema of the Avro file
-         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+        read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+        read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of types to use for struct data
 
-     Raises:
-         NotImplementedError: If attempting to resolve an unrecognized object type
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
     """
-    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+    return visit_with_partner(file_schema, read_schema, SchemaResolver(read_types), SchemaPartnerAccessor())  # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+    read_types: Dict[int, Callable[[Schema], StructProtocol]]
+    field_ids: List[int]
+
+    def before_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.pop()
+
+    def create_struct_reader(self, read_schema: StructType, field_readers: Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+        current_field_id = self.field_ids[-1] if self.field_ids else -1
+        if constructor := self.read_types.get(current_field_id):
+            return StructProtocolReader(field_readers, partial(constructor, read_schema))
+
+        return StructReader(field_readers)
+
+    def __init__(self, read_types: Dict[int, Callable[[Schema], StructProtocol]]):
+        self.read_types = read_types
+        self.field_ids = []
+
+    def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader:
+        return result
+
+    def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+        if not expected_struct:
+            # no values are expected so the reader will only be used for skipping
+            return StructReader(tuple(enumerate(field_readers)))
+
+        if not isinstance(expected_struct, StructType):
+            raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}")
+
+        results: List[Tuple[Optional[int], Reader]] = []
+        expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)}
+
+        # first, add readers for the file fields that must be in order
+        for field, result_reader in zip(struct.fields, field_readers):
+            read_pos = expected_positions.get(field.field_id)
+            results.append((read_pos, result_reader))
+
+        file_fields = {field.field_id: field for field in struct.fields}
+        for pos, read_field in enumerate(expected_struct.fields):
+            if read_field.field_id not in file_fields:
+                if read_field.required:
+                    raise ResolveError(f"{read_field} is non-optional, and not part of the file schema")
+                # Just set the new field to None
+                results.append((pos, NoneReader()))
+
+        return self.create_struct_reader(expected_struct, tuple(results))
+
+    def field(self, field: NestedField, expected_field: Optional[IcebergType], field_reader: Reader) -> Reader:
+        return field_reader if field.required else OptionReader(field_reader)
+
+    def list(self, list_type: ListType, expected_list: Optional[IcebergType], element_reader: Reader) -> Reader:
+        if expected_list and not isinstance(expected_list, ListType):
+            raise ResolveError(f"File/read schema are not aligned for list, got {expected_list}")
+
+        return ListReader(element_reader if list_type.element_required else OptionReader(element_reader))
+
+    def map(self, map_type: MapType, expected_map: Optional[IcebergType], key_reader: Reader, value_reader: Reader) -> Reader:
+        if expected_map and not isinstance(expected_map, MapType):
+            raise ResolveError(f"File/read schema are not aligned for map, got {expected_map}")
 
+        return MapReader(key_reader, value_reader if map_type.value_required else OptionReader(value_reader))
 
-@resolve.register(Schema)
-def _(file_schema: Schema, read_schema: Schema) -> Reader:
-    """Visit a Schema and starts resolving it by converting it to a struct"""
-    return resolve(file_schema.as_struct(), read_schema.as_struct())
+    def primitive(self, primitive: PrimitiveType, expected_primitive: Optional[IcebergType]) -> Reader:
+        if expected_primitive is not None:
+            if not isinstance(expected_primitive, PrimitiveType):
+                raise ResolveError(f"File/read schema are not aligned for {primitive}, got {expected_primitive}")
 
+            # ensure that the type can be projected to the expected
+            if primitive != expected_primitive:
+                promote(primitive, expected_primitive)
 
-@resolve.register(StructType)
-def _(file_struct: StructType, read_struct: IcebergType) -> Reader:
-    """Iterates over the file schema, and checks if the field is in the read schema"""
+        return super().primitive(primitive, expected_primitive)
 
-    if not isinstance(read_struct, StructType):
-        raise ResolveError(f"File/read schema are not aligned for {file_struct}, got {read_struct}")
+    def visit_boolean(self, boolean_type: BooleanType, partner: Optional[IcebergType]) -> Reader:
+        return BooleanReader()
 
-    results: List[Tuple[Optional[int], Reader]] = []
-    read_fields = {field.field_id: (pos, field) for pos, field in enumerate(read_struct.fields)}
+    def visit_integer(self, integer_type: IntegerType, partner: Optional[IcebergType]) -> Reader:
+        return IntegerReader()
 
-    for file_field in file_struct.fields:
-        if file_field.field_id in read_fields:
-            read_pos, read_field = read_fields[file_field.field_id]
-            result_reader = resolve(file_field.field_type, read_field.field_type)
+    def visit_long(self, long_type: LongType, partner: Optional[IcebergType]) -> Reader:
+        return IntegerReader()

Review Comment:
   👍🏻 



##########
python/pyiceberg/manifest.py:
##########
@@ -104,29 +104,100 @@ class ManifestEntry(IcebergBaseModel):
     data_file: DataFile = Field()
 
 
+PARTITION_FIELD_SUMMARY_TYPE = StructType(
+    NestedField(509, "contains_null", BooleanType(), required=True),
+    NestedField(518, "contains_nan", BooleanType(), required=False),
+    NestedField(510, "lower_bound", BinaryType(), required=False),
+    NestedField(511, "upper_bound", BinaryType(), required=False)
+)
+
+
 class PartitionFieldSummary(IcebergBaseModel):
     contains_null: bool = Field()
     contains_nan: Optional[bool] = Field()
     lower_bound: Optional[bytes] = Field()
     upper_bound: Optional[bytes] = Field()
 
 
-class ManifestFile(IcebergBaseModel):
-    manifest_path: str = Field()
-    manifest_length: int = Field()
-    partition_spec_id: int = Field()
-    content: ManifestContent = Field(default=ManifestContent.DATA)
-    sequence_number: int = Field(default=0)
-    min_sequence_number: int = Field(default=0)
-    added_snapshot_id: Optional[int] = Field()
-    added_data_files_count: Optional[int] = Field()
-    existing_data_files_count: Optional[int] = Field()
-    deleted_data_files_count: Optional[int] = Field()
-    added_rows_count: Optional[int] = Field()
-    existing_rows_counts: Optional[int] = Field()
-    deleted_rows_count: Optional[int] = Field()
-    partitions: Optional[List[PartitionFieldSummary]] = Field()
-    key_metadata: Optional[bytes] = Field()
+MANIFEST_FILE_SCHEMA: Schema = Schema(
+    NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
+    NestedField(501, "manifest_length", LongType(), required=True),
+    NestedField(502, "partition_spec_id", IntegerType(), required=True),
+    NestedField(517, "content", IntegerType(), required=False),
+    NestedField(515, "sequence_number", LongType(), required=False),
+    NestedField(516, "min_sequence_number", LongType(), required=False),
+    NestedField(503, "added_snapshot_id", LongType(), required=False),
+    NestedField(504, "added_data_files_count", IntegerType(), required=False),
+    NestedField(505, "existing_data_files_count", IntegerType(), required=False),
+    NestedField(506, "deleted_data_files_count", IntegerType(), required=False),
+    NestedField(512, "added_rows_count", LongType(), required=False),
+    NestedField(513, "existing_rows_count", LongType(), required=False),
+    NestedField(514, "deleted_rows_count", LongType(), required=False),
+    NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
+    NestedField(519, "key_metadata", BinaryType(), required=False),
+)
+
+MANIFEST_FILE_FIELD_NAMES: Dict[int, str] = {pos: field.name for pos, field in MANIFEST_FILE_SCHEMA.fields}
+IN_TRANSFORMS: Dict[int, Callable[[Any], Any]] = {
+    3: lambda id: ManifestContent.DATA if id == 0 else ManifestContent.DELETES
+}
+OUT_TRANSFORMS: Dict[int, Callable[[Any], Any]] = {
+    3: lambda content: content.id
+}
+
+
+class ManifestFile(StructProtocol):
+    manifest_path: str
+    manifest_length: int
+    partition_spec_id: int
+    content: ManifestContent
+    sequence_number: int
+    min_sequence_number: int
+    added_snapshot_id: Optional[int]
+    added_data_files_count: Optional[int]
+    existing_data_files_count: Optional[int]
+    deleted_data_files_count: Optional[int]
+    added_rows_count: Optional[int]
+    existing_rows_count: Optional[int]
+    deleted_rows_count: Optional[int]
+    partitions: Optional[List[PartitionFieldSummary]]
+    key_metadata: Optional[bytes]
+
+    _projection_positions: Dict[int, int]
+
+    def __init__(self, read_schema: Optional[Schema] = None):
+        if read_schema:
+            id_to_pos = {field.field_id: pos for pos, field in enumerate(MANIFEST_FILE_SCHEMA.fields)}
+            self._projection_positions = {pos: id_to_pos[field.field_id] for pos, field in enumerate(read_schema.fields)}
+        else:
+            # all fields are projected
+            self._projection_positions = {pos: pos for pos, field in MANIFEST_FILE_SCHEMA.fields}
+
+    @property
+    def content(self) -> ManifestContent:

Review Comment:
   This one shadows the `content` field. I don't think this is necessary if we go with the `IN_TRANSFORM`



##########
python/pyiceberg/manifest.py:
##########
@@ -104,29 +104,100 @@ class ManifestEntry(IcebergBaseModel):
     data_file: DataFile = Field()
 
 
+PARTITION_FIELD_SUMMARY_TYPE = StructType(
+    NestedField(509, "contains_null", BooleanType(), required=True),
+    NestedField(518, "contains_nan", BooleanType(), required=False),
+    NestedField(510, "lower_bound", BinaryType(), required=False),
+    NestedField(511, "upper_bound", BinaryType(), required=False)
+)
+
+
 class PartitionFieldSummary(IcebergBaseModel):
     contains_null: bool = Field()
     contains_nan: Optional[bool] = Field()
     lower_bound: Optional[bytes] = Field()
     upper_bound: Optional[bytes] = Field()
 
 
-class ManifestFile(IcebergBaseModel):
-    manifest_path: str = Field()
-    manifest_length: int = Field()
-    partition_spec_id: int = Field()
-    content: ManifestContent = Field(default=ManifestContent.DATA)
-    sequence_number: int = Field(default=0)
-    min_sequence_number: int = Field(default=0)
-    added_snapshot_id: Optional[int] = Field()
-    added_data_files_count: Optional[int] = Field()
-    existing_data_files_count: Optional[int] = Field()
-    deleted_data_files_count: Optional[int] = Field()
-    added_rows_count: Optional[int] = Field()
-    existing_rows_counts: Optional[int] = Field()
-    deleted_rows_count: Optional[int] = Field()
-    partitions: Optional[List[PartitionFieldSummary]] = Field()
-    key_metadata: Optional[bytes] = Field()
+MANIFEST_FILE_SCHEMA: Schema = Schema(
+    NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
+    NestedField(501, "manifest_length", LongType(), required=True),
+    NestedField(502, "partition_spec_id", IntegerType(), required=True),
+    NestedField(517, "content", IntegerType(), required=False),
+    NestedField(515, "sequence_number", LongType(), required=False),
+    NestedField(516, "min_sequence_number", LongType(), required=False),
+    NestedField(503, "added_snapshot_id", LongType(), required=False),
+    NestedField(504, "added_data_files_count", IntegerType(), required=False),
+    NestedField(505, "existing_data_files_count", IntegerType(), required=False),
+    NestedField(506, "deleted_data_files_count", IntegerType(), required=False),
+    NestedField(512, "added_rows_count", LongType(), required=False),
+    NestedField(513, "existing_rows_count", LongType(), required=False),
+    NestedField(514, "deleted_rows_count", LongType(), required=False),
+    NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
+    NestedField(519, "key_metadata", BinaryType(), required=False),
+)
+
+MANIFEST_FILE_FIELD_NAMES: Dict[int, str] = {pos: field.name for pos, field in MANIFEST_FILE_SCHEMA.fields}
+IN_TRANSFORMS: Dict[int, Callable[[Any], Any]] = {
+    3: lambda id: ManifestContent.DATA if id == 0 else ManifestContent.DELETES
+}
+OUT_TRANSFORMS: Dict[int, Callable[[Any], Any]] = {
+    3: lambda content: content.id
+}
+
+
+class ManifestFile(StructProtocol):
+    manifest_path: str
+    manifest_length: int
+    partition_spec_id: int
+    content: ManifestContent
+    sequence_number: int
+    min_sequence_number: int
+    added_snapshot_id: Optional[int]
+    added_data_files_count: Optional[int]
+    existing_data_files_count: Optional[int]
+    deleted_data_files_count: Optional[int]
+    added_rows_count: Optional[int]
+    existing_rows_count: Optional[int]
+    deleted_rows_count: Optional[int]
+    partitions: Optional[List[PartitionFieldSummary]]
+    key_metadata: Optional[bytes]
+
+    _projection_positions: Dict[int, int]
+
+    def __init__(self, read_schema: Optional[Schema] = None):
+        if read_schema:
+            id_to_pos = {field.field_id: pos for pos, field in enumerate(MANIFEST_FILE_SCHEMA.fields)}

Review Comment:
   Should we move those to the global scope? Since they are static and we probably don't want to recompute these every time we construct a ManifestFile.



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    return resolve(file_schema, file_schema)
+
 
-    The function traverses the schema in post-order fashion
+def resolve(
+    file_schema: Union[Schema, IcebergType],
+    read_schema: Union[Schema, IcebergType],
+    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+    """Resolves the file and read schema to produce a reader
 
-     Args:
-         file_schema (Schema | IcebergType): The schema of the Avro file
-         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+        read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+        read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of types to use for struct data
 
-     Raises:
-         NotImplementedError: If attempting to resolve an unrecognized object type
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
     """
-    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+    return visit_with_partner(file_schema, read_schema, SchemaResolver(read_types), SchemaPartnerAccessor())  # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+    read_types: Dict[int, Callable[[Schema], StructProtocol]]
+    field_ids: List[int]
+
+    def before_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.pop()
+
+    def create_struct_reader(self, read_schema: StructType, field_readers: Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+        current_field_id = self.field_ids[-1] if self.field_ids else -1
+        if constructor := self.read_types.get(current_field_id):

Review Comment:
   If you're feeling adventurous, you could also combine this into a single line:
   ```suggestion
           if self.field_ids and (constructor := self.read_types.get(self.field_ids[-1])):
   ```
   This omits the awkward `-1` in the else branch



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    return resolve(file_schema, file_schema)
+
 
-    The function traverses the schema in post-order fashion
+def resolve(
+    file_schema: Union[Schema, IcebergType],
+    read_schema: Union[Schema, IcebergType],
+    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+    """Resolves the file and read schema to produce a reader
 
-     Args:
-         file_schema (Schema | IcebergType): The schema of the Avro file
-         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+        read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+        read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of types to use for struct data
 
-     Raises:
-         NotImplementedError: If attempting to resolve an unrecognized object type
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
     """
-    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+    return visit_with_partner(file_schema, read_schema, SchemaResolver(read_types), SchemaPartnerAccessor())  # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+    read_types: Dict[int, Callable[[Schema], StructProtocol]]
+    field_ids: List[int]
+
+    def before_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.pop()
+
+    def create_struct_reader(self, read_schema: StructType, field_readers: Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+        current_field_id = self.field_ids[-1] if self.field_ids else -1
+        if constructor := self.read_types.get(current_field_id):
+            return StructProtocolReader(field_readers, partial(constructor, read_schema))
+
+        return StructReader(field_readers)
+
+    def __init__(self, read_types: Dict[int, Callable[[Schema], StructProtocol]]):
+        self.read_types = read_types
+        self.field_ids = []
+
+    def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader:
+        return result
+
+    def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+        if not expected_struct:
+            # no values are expected so the reader will only be used for skipping
+            return StructReader(tuple(enumerate(field_readers)))
+
+        if not isinstance(expected_struct, StructType):
+            raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}")
+
+        results: List[Tuple[Optional[int], Reader]] = []
+        expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)}
+
+        # first, add readers for the file fields that must be in order
+        for field, result_reader in zip(struct.fields, field_readers):
+            read_pos = expected_positions.get(field.field_id)
+            results.append((read_pos, result_reader))
+
+        file_fields = {field.field_id: field for field in struct.fields}
+        for pos, read_field in enumerate(expected_struct.fields):
+            if read_field.field_id not in file_fields:
+                if read_field.required:
+                    raise ResolveError(f"{read_field} is non-optional, and not part of the file schema")
+                # Just set the new field to None
+                results.append((pos, NoneReader()))
+
+        return self.create_struct_reader(expected_struct, tuple(results))
+
+    def field(self, field: NestedField, expected_field: Optional[IcebergType], field_reader: Reader) -> Reader:
+        return field_reader if field.required else OptionReader(field_reader)
+
+    def list(self, list_type: ListType, expected_list: Optional[IcebergType], element_reader: Reader) -> Reader:
+        if expected_list and not isinstance(expected_list, ListType):
+            raise ResolveError(f"File/read schema are not aligned for list, got {expected_list}")
+
+        return ListReader(element_reader if list_type.element_required else OptionReader(element_reader))
+
+    def map(self, map_type: MapType, expected_map: Optional[IcebergType], key_reader: Reader, value_reader: Reader) -> Reader:
+        if expected_map and not isinstance(expected_map, MapType):
+            raise ResolveError(f"File/read schema are not aligned for map, got {expected_map}")
 
+        return MapReader(key_reader, value_reader if map_type.value_required else OptionReader(value_reader))
 
-@resolve.register(Schema)
-def _(file_schema: Schema, read_schema: Schema) -> Reader:
-    """Visit a Schema and starts resolving it by converting it to a struct"""
-    return resolve(file_schema.as_struct(), read_schema.as_struct())
+    def primitive(self, primitive: PrimitiveType, expected_primitive: Optional[IcebergType]) -> Reader:
+        if expected_primitive is not None:
+            if not isinstance(expected_primitive, PrimitiveType):
+                raise ResolveError(f"File/read schema are not aligned for {primitive}, got {expected_primitive}")
 
+            # ensure that the type can be projected to the expected
+            if primitive != expected_primitive:
+                promote(primitive, expected_primitive)

Review Comment:
   We ignore the output here.



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    return resolve(file_schema, file_schema)
+
 
-    The function traverses the schema in post-order fashion
+def resolve(
+    file_schema: Union[Schema, IcebergType],
+    read_schema: Union[Schema, IcebergType],
+    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+    """Resolves the file and read schema to produce a reader
 
-     Args:
-         file_schema (Schema | IcebergType): The schema of the Avro file
-         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+        read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+        read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of types to use for struct data
 
-     Raises:
-         NotImplementedError: If attempting to resolve an unrecognized object type
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
     """
-    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+    return visit_with_partner(file_schema, read_schema, SchemaResolver(read_types), SchemaPartnerAccessor())  # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+    read_types: Dict[int, Callable[[Schema], StructProtocol]]
+    field_ids: List[int]
+
+    def before_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.pop()
+
+    def create_struct_reader(self, read_schema: StructType, field_readers: Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+        current_field_id = self.field_ids[-1] if self.field_ids else -1
+        if constructor := self.read_types.get(current_field_id):
+            return StructProtocolReader(field_readers, partial(constructor, read_schema))
+
+        return StructReader(field_readers)
+
+    def __init__(self, read_types: Dict[int, Callable[[Schema], StructProtocol]]):

Review Comment:
   Should we move the init to the top?



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    return resolve(file_schema, file_schema)
+
 
-    The function traverses the schema in post-order fashion
+def resolve(
+    file_schema: Union[Schema, IcebergType],
+    read_schema: Union[Schema, IcebergType],
+    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+    """Resolves the file and read schema to produce a reader
 
-     Args:
-         file_schema (Schema | IcebergType): The schema of the Avro file
-         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+        read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+        read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of types to use for struct data
 
-     Raises:
-         NotImplementedError: If attempting to resolve an unrecognized object type
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
     """
-    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+    return visit_with_partner(file_schema, read_schema, SchemaResolver(read_types), SchemaPartnerAccessor())  # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+    read_types: Dict[int, Callable[[Schema], StructProtocol]]
+    field_ids: List[int]
+
+    def before_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.pop()
+
+    def create_struct_reader(self, read_schema: StructType, field_readers: Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+        current_field_id = self.field_ids[-1] if self.field_ids else -1
+        if constructor := self.read_types.get(current_field_id):
+            return StructProtocolReader(field_readers, partial(constructor, read_schema))
+
+        return StructReader(field_readers)
+
+    def __init__(self, read_types: Dict[int, Callable[[Schema], StructProtocol]]):
+        self.read_types = read_types
+        self.field_ids = []
+
+    def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader:
+        return result
+
+    def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+        if not expected_struct:
+            # no values are expected so the reader will only be used for skipping
+            return StructReader(tuple(enumerate(field_readers)))
+
+        if not isinstance(expected_struct, StructType):
+            raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}")
+
+        results: List[Tuple[Optional[int], Reader]] = []
+        expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)}
+
+        # first, add readers for the file fields that must be in order
+        for field, result_reader in zip(struct.fields, field_readers):

Review Comment:
   How do you feel about directly assigning `results`:
   ```python
   results: List[Tuple[Optional[int], Reader]] = [
       (expected_positions.get(field.field_id), result_reader)
       for field, result_reader in zip(struct.fields, field_readers)
   ]
   ```



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    return resolve(file_schema, file_schema)
+
 
-    The function traverses the schema in post-order fashion
+def resolve(
+    file_schema: Union[Schema, IcebergType],
+    read_schema: Union[Schema, IcebergType],
+    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+    """Resolves the file and read schema to produce a reader
 
-     Args:
-         file_schema (Schema | IcebergType): The schema of the Avro file
-         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+        read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+        read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of types to use for struct data
 
-     Raises:
-         NotImplementedError: If attempting to resolve an unrecognized object type
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type
     """
-    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+    return visit_with_partner(file_schema, read_schema, SchemaResolver(read_types), SchemaPartnerAccessor())  # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+    read_types: Dict[int, Callable[[Schema], StructProtocol]]
+    field_ids: List[int]
+
+    def before_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None:
+        self.field_ids.pop()
+
+    def create_struct_reader(self, read_schema: StructType, field_readers: Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+        current_field_id = self.field_ids[-1] if self.field_ids else -1
+        if constructor := self.read_types.get(current_field_id):
+            return StructProtocolReader(field_readers, partial(constructor, read_schema))
+
+        return StructReader(field_readers)
+
+    def __init__(self, read_types: Dict[int, Callable[[Schema], StructProtocol]]):
+        self.read_types = read_types
+        self.field_ids = []
+
+    def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader:
+        return result
+
+    def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+        if not expected_struct:
+            # no values are expected so the reader will only be used for skipping
+            return StructReader(tuple(enumerate(field_readers)))
+
+        if not isinstance(expected_struct, StructType):
+            raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}")
+
+        results: List[Tuple[Optional[int], Reader]] = []
+        expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)}
+
+        # first, add readers for the file fields that must be in order
+        for field, result_reader in zip(struct.fields, field_readers):
+            read_pos = expected_positions.get(field.field_id)
+            results.append((read_pos, result_reader))
+
+        file_fields = {field.field_id: field for field in struct.fields}
+        for pos, read_field in enumerate(expected_struct.fields):
+            if read_field.field_id not in file_fields:
+                if read_field.required:
+                    raise ResolveError(f"{read_field} is non-optional, and not part of the file schema")
+                # Just set the new field to None
+                results.append((pos, NoneReader()))
+
+        return self.create_struct_reader(expected_struct, tuple(results))
+
+    def field(self, field: NestedField, expected_field: Optional[IcebergType], field_reader: Reader) -> Reader:
+        return field_reader if field.required else OptionReader(field_reader)
+
+    def list(self, list_type: ListType, expected_list: Optional[IcebergType], element_reader: Reader) -> Reader:
+        if expected_list and not isinstance(expected_list, ListType):
+            raise ResolveError(f"File/read schema are not aligned for list, got {expected_list}")
+
+        return ListReader(element_reader if list_type.element_required else OptionReader(element_reader))
+
+    def map(self, map_type: MapType, expected_map: Optional[IcebergType], key_reader: Reader, value_reader: Reader) -> Reader:
+        if expected_map and not isinstance(expected_map, MapType):
+            raise ResolveError(f"File/read schema are not aligned for map, got {expected_map}")
 
+        return MapReader(key_reader, value_reader if map_type.value_required else OptionReader(value_reader))
 
-@resolve.register(Schema)
-def _(file_schema: Schema, read_schema: Schema) -> Reader:
-    """Visit a Schema and starts resolving it by converting it to a struct"""
-    return resolve(file_schema.as_struct(), read_schema.as_struct())
+    def primitive(self, primitive: PrimitiveType, expected_primitive: Optional[IcebergType]) -> Reader:
+        if expected_primitive is not None:
+            if not isinstance(expected_primitive, PrimitiveType):
+                raise ResolveError(f"File/read schema are not aligned for {primitive}, got {expected_primitive}")
 
+            # ensure that the type can be projected to the expected
+            if primitive != expected_primitive:
+                promote(primitive, expected_primitive)
 
-@resolve.register(StructType)
-def _(file_struct: StructType, read_struct: IcebergType) -> Reader:
-    """Iterates over the file schema, and checks if the field is in the read schema"""
+        return super().primitive(primitive, expected_primitive)
 
-    if not isinstance(read_struct, StructType):
-        raise ResolveError(f"File/read schema are not aligned for {file_struct}, got {read_struct}")
+    def visit_boolean(self, boolean_type: BooleanType, partner: Optional[IcebergType]) -> Reader:
+        return BooleanReader()
 
-    results: List[Tuple[Optional[int], Reader]] = []
-    read_fields = {field.field_id: (pos, field) for pos, field in enumerate(read_struct.fields)}
+    def visit_integer(self, integer_type: IntegerType, partner: Optional[IcebergType]) -> Reader:
+        return IntegerReader()
 
-    for file_field in file_struct.fields:
-        if file_field.field_id in read_fields:
-            read_pos, read_field = read_fields[file_field.field_id]
-            result_reader = resolve(file_field.field_type, read_field.field_type)
+    def visit_long(self, long_type: LongType, partner: Optional[IcebergType]) -> Reader:
+        return IntegerReader()
+
+    def visit_float(self, float_type: FloatType, partner: Optional[IcebergType]) -> Reader:
+        return FloatReader()
+
+    def visit_double(self, double_type: DoubleType, partner: Optional[IcebergType]) -> Reader:
+        return DoubleReader()
+
+    def visit_decimal(self, decimal_type: DecimalType, partner: Optional[IcebergType]) -> Reader:
+        return DecimalReader(decimal_type.precision, decimal_type.scale)
+
+    def visit_date(self, date_type: DateType, partner: Optional[IcebergType]) -> Reader:
+        return DateReader()
+
+    def visit_time(self, time_type: TimeType, partner: Optional[IcebergType]) -> Reader:
+        return TimeReader()
+
+    def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[IcebergType]) -> Reader:
+        return TimestampReader()
+
+    def visit_timestampz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Reader:
+        return TimestamptzReader()
+
+    def visit_string(self, string_type: StringType, partner: Optional[IcebergType]) -> Reader:
+        return StringReader()
+
+    def visit_uuid(self, uuid_type: UUIDType, partner: Optional[IcebergType]) -> Reader:
+        return UUIDReader()
+
+    def visit_fixed(self, fixed_type: FixedType, partner: Optional[IcebergType]) -> Reader:
+        return FixedReader(len(fixed_type))
+
+    def visit_binary(self, binary_type: BinaryType, partner: Optional[IcebergType]) -> Reader:
+        return BinaryReader()
+
+
+class SchemaPartnerAccessor(PartnerAccessor[IcebergType]):
+    def schema_partner(self, partner: Optional[IcebergType]) -> Optional[IcebergType]:
+        if isinstance(partner, Schema):
+            return partner.as_struct()
+
+        raise ResolveError(f"File/read schema are not aligned for schema, got {partner}")
+
+    def field_partner(self, partner: Optional[IcebergType], field_id: int, field_name: str) -> Optional[IcebergType]:
+        if isinstance(partner, StructType):
+            field = partner.field(field_id)
         else:
-            read_pos = None
-            result_reader = visit(file_field.field_type, ConstructReader())
-        result_reader = result_reader if file_field.required else OptionReader(result_reader)
-        results.append((read_pos, result_reader))
-
-    file_fields = {field.field_id: field for field in file_struct.fields}
-    for pos, read_field in enumerate(read_struct.fields):
-        if read_field.field_id not in file_fields:
-            if read_field.required:
-                raise ResolveError(f"{read_field} is non-optional, and not part of the file schema")
-            # Just set the new field to None
-            results.append((pos, NoneReader()))
-
-    return StructReader(tuple(results))
-
-
-@resolve.register(ListType)
-def _(file_list: ListType, read_list: IcebergType) -> Reader:
-    if not isinstance(read_list, ListType):
-        raise ResolveError(f"File/read schema are not aligned for {file_list}, got {read_list}")
-    element_reader = resolve(file_list.element_type, read_list.element_type)
-    return ListReader(element_reader)
-
-
-@resolve.register(MapType)
-def _(file_map: MapType, read_map: IcebergType) -> Reader:
-    if not isinstance(read_map, MapType):
-        raise ResolveError(f"File/read schema are not aligned for {file_map}, got {read_map}")
-    key_reader = resolve(file_map.key_type, read_map.key_type)
-    value_reader = resolve(file_map.value_type, read_map.value_type)
-
-    return MapReader(key_reader, value_reader)
-
-
-@resolve.register(FloatType)
-def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
-    """This is a special case, when we need to adhere to the bytes written"""
-    if isinstance(read_type, DoubleType):
-        return visit(file_type, ConstructReader())
-    else:
-        raise ResolveError(f"Cannot promote an float to {read_type}")
-
-
-@resolve.register(PrimitiveType)
-def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
-    """Converting the primitive type into an actual reader that will decode the physical data"""
-    if not isinstance(read_type, PrimitiveType):
-        raise ResolveError(f"Cannot promote {file_type} to {read_type}")
-
-    # In the case of a promotion, we want to check if it is valid
-    if file_type != read_type:
-        read_type = promote(file_type, read_type)
-    return visit(read_type, ConstructReader())
+            raise ResolveError(f"File/read schema are not aligned for struct, got {partner}")
+
+        return field.field_type if field else None
+
+    def list_element_partner(self, partner_list: Optional[IcebergType]) -> Optional[IcebergType]:
+        if isinstance(partner_list, ListType):
+            return partner_list.element_type
+
+        raise ResolveError(f"File/read schema are not aligned for list, got {partner_list}")
+
+    def map_key_partner(self, partner_map: Optional[IcebergType]) -> Optional[IcebergType]:
+        if isinstance(partner_map, MapType):
+            return partner_map.key_type
+
+        raise ResolveError(f"File/read schema are not aligned for map, got {partner_map}")
+
+    def map_value_partner(self, partner_map: Optional[IcebergType]) -> Optional[IcebergType]:
+        if isinstance(partner_map, MapType):
+            return partner_map.value_type
+
+        raise ResolveError(f"File/read schema are not aligned for map, got {partner_map}")

Review Comment:
   ```suggestion
           raise ResolveError(f"File/read schema are not aligned for map value, got {partner_map}")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org