You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/12/31 01:00:14 UTC

[GitHub] [iceberg] rdblue commented on a diff in pull request #6506: Python: Refactor Avro read path to use a partner visitor

rdblue commented on code in PR #6506:
URL: https://github.com/apache/iceberg/pull/6506#discussion_r1059559551


##########
python/pyiceberg/avro/resolver.py:
##########
@@ -57,80 +61,91 @@ def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema,
      Raises:
          NotImplementedError: If attempting to resolve an unrecognized object type
     """
-    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+    return visit_with_partner(file_schema, read_schema, SchemaResolver(), SchemaPartnerAccessor())
+
+
+class SchemaResolver(SchemaWithPartnerVisitor[IcebergType, Reader]):
+    def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader:
+        return result
+
+    def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+        if expected_struct and not isinstance(expected_struct, StructType):
+            raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}")
+
+        results: List[Tuple[Optional[int], Reader]] = []
+        expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)}
+
+        # first, add readers for the file fields that must be in order
+        for field, result_reader in zip(struct.fields, field_readers):
+            read_pos = expected_positions.get(field.field_id)
+            results.append((read_pos, result_reader))
+
+        file_fields = {field.field_id: field for field in struct.fields}
+        for pos, read_field in enumerate(expected_struct.fields):
+            if read_field.field_id not in file_fields:
+                if read_field.required:
+                    raise ResolveError(f"{read_field} is non-optional, and not part of the file schema")
+                # Just set the new field to None
+                results.append((pos, NoneReader()))
+
+        return StructReader(tuple(results))
+
+    def field(self, field: NestedField, expected_field: Optional[IcebergType], field_reader: Reader) -> Reader:
+        return field_reader
+
+    def list(self, list_type: ListType, expected_list: Optional[IcebergType], element_reader: Reader) -> Reader:
+        if expected_list and not isinstance(expected_list, ListType):
+            raise ResolveError(f"File/read schema are not aligned for list, got {expected_list}")
 
+        return ListReader(element_reader)
 
-@resolve.register(Schema)
-def _(file_schema: Schema, read_schema: Schema) -> Reader:
-    """Visit a Schema and starts resolving it by converting it to a struct"""
-    return resolve(file_schema.as_struct(), read_schema.as_struct())
+    def map(self, map_type: MapType, expected_map: Optional[IcebergType], key_reader: Reader, value_reader: Reader) -> Reader:
+        if expected_map and not isinstance(expected_map, MapType):
+            raise ResolveError(f"File/read schema are not aligned for map, got {expected_map}")
 
+        return MapReader(key_reader, value_reader)
 
-@resolve.register(StructType)
-def _(file_struct: StructType, read_struct: IcebergType) -> Reader:
-    """Iterates over the file schema, and checks if the field is in the read schema"""
+    def primitive(self, primitive: PrimitiveType, expected_primitive: Optional[IcebergType]) -> Reader:
+        if expected_primitive is not None:
+            if not isinstance(expected_primitive, PrimitiveType):
+                raise ResolveError(f"File/read schema are not aligned for {primitive}, got {expected_primitive}")
 
-    if not isinstance(read_struct, StructType):
-        raise ResolveError(f"File/read schema are not aligned for {file_struct}, got {read_struct}")
+            # ensure that the type can be projected to the expected
+            if primitive != expected_primitive:
+                promote(primitive, expected_primitive)
 
-    results: List[Tuple[Optional[int], Reader]] = []
-    read_fields = {field.field_id: (pos, field) for pos, field in enumerate(read_struct.fields)}
+        return visit(primitive, ConstructReader())
 
-    for file_field in file_struct.fields:
-        if file_field.field_id in read_fields:
-            read_pos, read_field = read_fields[file_field.field_id]
-            result_reader = resolve(file_field.field_type, read_field.field_type)
+
+class SchemaPartnerAccessor(PartnerAccessor[IcebergType]):
+    def schema_partner(self, partner: Optional[IcebergType]) -> Optional[IcebergType]:
+        if isinstance(partner, Schema):
+            return partner.as_struct()
+
+        raise ResolveError(f"File/read schema are not aligned for schema, got {partner}")
+
+    def field_partner(self, partner: Optional[IcebergType], field_id: int, field_name: str) -> Optional[IcebergType]:
+        if isinstance(partner, StructType):
+            field = partner.field(field_id)
         else:
-            read_pos = None
-            result_reader = visit(file_field.field_type, ConstructReader())
-        result_reader = result_reader if file_field.required else OptionReader(result_reader)
-        results.append((read_pos, result_reader))
-
-    file_fields = {field.field_id: field for field in file_struct.fields}
-    for pos, read_field in enumerate(read_struct.fields):
-        if read_field.field_id not in file_fields:
-            if read_field.required:
-                raise ResolveError(f"{read_field} is non-optional, and not part of the file schema")
-            # Just set the new field to None
-            results.append((pos, NoneReader()))
-
-    return StructReader(tuple(results))
-
-
-@resolve.register(ListType)
-def _(file_list: ListType, read_list: IcebergType) -> Reader:
-    if not isinstance(read_list, ListType):
-        raise ResolveError(f"File/read schema are not aligned for {file_list}, got {read_list}")
-    element_reader = resolve(file_list.element_type, read_list.element_type)
-    return ListReader(element_reader)
-
-
-@resolve.register(MapType)
-def _(file_map: MapType, read_map: IcebergType) -> Reader:
-    if not isinstance(read_map, MapType):
-        raise ResolveError(f"File/read schema are not aligned for {file_map}, got {read_map}")
-    key_reader = resolve(file_map.key_type, read_map.key_type)
-    value_reader = resolve(file_map.value_type, read_map.value_type)
-
-    return MapReader(key_reader, value_reader)
-
-
-@resolve.register(FloatType)
-def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
-    """This is a special case, when we need to adhere to the bytes written"""
-    if isinstance(read_type, DoubleType):
-        return visit(file_type, ConstructReader())
-    else:
-        raise ResolveError(f"Cannot promote an float to {read_type}")
-
-
-@resolve.register(PrimitiveType)
-def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
-    """Converting the primitive type into an actual reader that will decode the physical data"""
-    if not isinstance(read_type, PrimitiveType):
-        raise ResolveError(f"Cannot promote {file_type} to {read_type}")
-
-    # In the case of a promotion, we want to check if it is valid
-    if file_type != read_type:
-        read_type = promote(file_type, read_type)
-    return visit(read_type, ConstructReader())
+            raise ResolveError(f"File/read schema are not aligned for struct, got {partner}")

Review Comment:
   Raising these exceptions doesn't really need to happen here, but there are `ResolveError` tests that won't pass without these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org