You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/22 21:35:00 UTC

[GitHub] [iceberg] Fokko opened a new pull request, #5116: Python: Resolve write/read schemas

Fokko opened a new pull request, #5116:
URL: https://github.com/apache/iceberg/pull/5116

   This PR allows you to supply a read schema when reading an Avro file. It will construct a read tree that only reads the actual fields that it needs, and skips over the ones that aren't part of the read schema.
   
   Also combined `read_long` and `read_int` into one. They are still two separate readers, but just one on the decoder. This is because they are binary compatible, and there is no long in Python.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908050732


##########
python/src/iceberg/avro/reader.py:
##########
@@ -177,13 +222,28 @@ def read(self, decoder: BinaryDecoder) -> Any | None:
             return self.option.read(decoder)
         return None
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        if decoder.read_int() > 0:
+            return self.option.skip(decoder)
+
 
 @dataclass(frozen=True)
 class StructReader(Reader):
-    fields: tuple[Reader, ...] = dataclassfield()
+    fields: tuple[tuple[int | None, Reader], ...] = dataclassfield()
 
     def read(self, decoder: BinaryDecoder) -> AvroStruct:
-        return AvroStruct([field.read(decoder) for field in self.fields])
+        result: list[Any | StructProtocol] = [object] * len(self.fields)

Review Comment:
   It is just a placeholder to pre-allocate the list. `None` works as well, I've updated it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907843103


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):

Review Comment:
   `read_map` is declared to be a `MapType`, but since you're checking it here it seems like it should be `IcebergType` instead? That would match what is done for `PrimitiveType`, although I'm not sure how you get type checking to recognize that it's a `MapType` after all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908920937


##########
python/tests/avro/test_resolver.py:
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest

Review Comment:
   We may be able to use fastavro for it, but we'd need to make sure there are field IDs to do projection right. (We could add name mapping for that.) So yeah, I'm happy adding this when we have a write path. I think the implementation is correct.
   
   We should probably add a write path soon, like you said.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908521724


##########
python/tests/avro/test_decoder.py:
##########
@@ -33,10 +33,10 @@ def test_read_decimal_from_fixed():
     assert actual == expected
 
 
-def test_read_long():
+def test_read_int():
     mis = MemoryInputStream(b"\x18")
     decoder = BinaryDecoder(mis)
-    assert decoder.read_long() == 12
+    assert decoder.read_int() == 12

Review Comment:
   Done! Including the missing read ones 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908631557


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):

Review Comment:
   Yeah, I agree with having a nice error message if the types don't match!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue merged PR #5116:
URL: https://github.com/apache/iceberg/pull/5116


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r909057161


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    List,
+    Optional,
+    Tuple,
+    Union,
+)
+
+from iceberg.avro.reader import (
+    CastReader,
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    DecimalType,
+    DoubleType,
+    FloatType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
+    """This resolves the file and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         file_schema (Schema | IcebergType): The schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+
+
+@resolve.register(Schema)
+def _(file_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(file_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(file_struct: StructType, read_struct: IcebergType) -> Reader:
+    """Iterates over the file schema, and checks if the field is in the read schema"""
+
+    if not isinstance(read_struct, StructType):
+        raise ResolveException(f"File/read schema are not aligned for {file_struct}, got {read_struct}")
+
+    results: List[Tuple[Optional[int], Reader]] = []
+    read_fields = {field.field_id: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for file_field in file_struct.fields:
+        if file_field.field_id in read_fields:
+            read_pos, read_field = read_fields[file_field.field_id]
+            result_reader = resolve(file_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(file_field.field_type, ConstructReader())
+        result_reader = result_reader if file_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    file_fields = {field.field_id: field for field in file_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.field_id not in file_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is non-optional, and not part of the file schema")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(file_list: ListType, read_list: IcebergType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"File/read schema are not aligned for {file_list}, got {read_list}")
+    element_reader = resolve(file_list.element_type, read_list.element_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(file_map: MapType, read_map: IcebergType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"File/read schema are not aligned for {file_map}, got {read_map}")
+    key_reader = resolve(file_map.key_type, read_map.key_type)
+    value_reader = resolve(file_map.value_type, read_map.value_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+@resolve.register(PrimitiveType)
+def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
+    """Converting the primitive type into an actual reader that will decode the physical data"""
+    if not isinstance(read_type, PrimitiveType):
+        raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+    # In the case of a promotion, we want to check if it is valid
+    if file_type != read_type:
+        return promote(file_type, read_type)
+    return primitive_reader(read_type)
+
+
+@singledispatch
+def promote(file_type: IcebergType, read_type: IcebergType) -> Reader:
+    """Promotes reading a file type to a read type
+
+    Args:
+        file_type (IcebergType): The type of the Avro file
+        read_type (IcebergType): The requested read type
+
+    Raises:
+        ResolveException: If attempting to resolve an unrecognized object type
+    """
+    raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+
+@promote.register(IntegerType)
+def _(file_type: IntegerType, read_type: IcebergType) -> Reader:
+    if isinstance(read_type, LongType):
+        # Ints/Longs are binary compatible in Avro, so this is okay
+        return primitive_reader(read_type)
+    elif type(read_type) in {FloatType, DoubleType}:
+        # We should just read the int, and convert it to a float
+        return CastReader(primitive_reader(file_type), float)
+    else:
+        raise ResolveException(f"Cannot promote an int to {read_type}")
+
+
+@promote.register(LongType)
+def _(file_type: LongType, read_type: IcebergType) -> Reader:
+    if type(read_type) in {FloatType, DoubleType}:

Review Comment:
   These aren't allowed promotions in Iceberg, so I think we should remove them.



##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    List,
+    Optional,
+    Tuple,
+    Union,
+)
+
+from iceberg.avro.reader import (
+    CastReader,
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    DecimalType,
+    DoubleType,
+    FloatType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
+    """This resolves the file and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         file_schema (Schema | IcebergType): The schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+
+
+@resolve.register(Schema)
+def _(file_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(file_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(file_struct: StructType, read_struct: IcebergType) -> Reader:
+    """Iterates over the file schema, and checks if the field is in the read schema"""
+
+    if not isinstance(read_struct, StructType):
+        raise ResolveException(f"File/read schema are not aligned for {file_struct}, got {read_struct}")
+
+    results: List[Tuple[Optional[int], Reader]] = []
+    read_fields = {field.field_id: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for file_field in file_struct.fields:
+        if file_field.field_id in read_fields:
+            read_pos, read_field = read_fields[file_field.field_id]
+            result_reader = resolve(file_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(file_field.field_type, ConstructReader())
+        result_reader = result_reader if file_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    file_fields = {field.field_id: field for field in file_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.field_id not in file_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is non-optional, and not part of the file schema")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(file_list: ListType, read_list: IcebergType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"File/read schema are not aligned for {file_list}, got {read_list}")
+    element_reader = resolve(file_list.element_type, read_list.element_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(file_map: MapType, read_map: IcebergType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"File/read schema are not aligned for {file_map}, got {read_map}")
+    key_reader = resolve(file_map.key_type, read_map.key_type)
+    value_reader = resolve(file_map.value_type, read_map.value_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+@resolve.register(PrimitiveType)
+def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
+    """Converting the primitive type into an actual reader that will decode the physical data"""
+    if not isinstance(read_type, PrimitiveType):
+        raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+    # In the case of a promotion, we want to check if it is valid
+    if file_type != read_type:
+        return promote(file_type, read_type)
+    return primitive_reader(read_type)
+
+
+@singledispatch
+def promote(file_type: IcebergType, read_type: IcebergType) -> Reader:
+    """Promotes reading a file type to a read type
+
+    Args:
+        file_type (IcebergType): The type of the Avro file
+        read_type (IcebergType): The requested read type
+
+    Raises:
+        ResolveException: If attempting to resolve an unrecognized object type
+    """
+    raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+
+@promote.register(IntegerType)
+def _(file_type: IntegerType, read_type: IcebergType) -> Reader:
+    if isinstance(read_type, LongType):
+        # Ints/Longs are binary compatible in Avro, so this is okay
+        return primitive_reader(read_type)
+    elif type(read_type) in {FloatType, DoubleType}:

Review Comment:
   These aren't allowed in Iceberg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907845781


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},
+    # These are all allowed according to the Avro spec
+    # IntegerType: {LongType, FloatType, DoubleType},
+    # LongType: {FloatType, DoubleType},
+    # FloatType: {DoubleType},

Review Comment:
   This is allowed by the Iceberg spec, as is promotion from `DoubleType(P, S)` to `DoubleType(P2, S)` where `P2 > P`.
   
   Promoting `float` to `double` should be possible fairly easily. All you have to do is check whether it's allowed and then use the source type's reader. Since Python uses a double to represent both, it will automatically produce the correct result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907853040


##########
python/tests/avro/test_resolver.py:
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest

Review Comment:
   I'd recommend taking a look at the test cases in Java, which are pretty thorough and can find a lot of weird cases: https://github.com/apache/iceberg/blob/bd9408499cc99876fc52ebe3c00c9e40bd0e66c2/data/src/test/java/org/apache/iceberg/data/TestReadProjection.java#L38



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#issuecomment-1170082978

   Merged! Nice work, @Fokko!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r909054498


##########
python/tests/avro/test_resolver.py:
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest

Review Comment:
   I was thinking of using FastAvro, but then we need to do everything in Avro schemas, which is a bit of a hassle. Otherwise we can just use the typed iceberg schemas 🚀 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908097634


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):

Review Comment:
   I've updated this for all the complex types 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908525740


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},
+    # These are all allowed according to the Avro spec
+    # IntegerType: {LongType, FloatType, DoubleType},
+    # LongType: {FloatType, DoubleType},
+    # FloatType: {DoubleType},

Review Comment:
   I left the `float` to `double` one out for now because of the binary incompatibility as mentioned a few lines above. It adds additional complexity because we still need to read the float (4 bytes) and convert it into a double. If we would just plug in the double reader, then we would consume 8 bytes which will lead to problems.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907849548


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}

Review Comment:
   In Iceberg, resolution should be done by ID rather than by name. This name-based code is correct for reading Avro, but Iceberg requires the resolution to be done by ID rather than by name.
   
   In the Java implementation, we construct a read schema from the file schema and the Iceberg schema that can be used to read by name but produce the correct ID-based resolution. That's an annoying and difficult to maintain bit of code, which is why we want to do the ID-based resolution directly here, rather than relying on name-based resolution in an Avro library.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908084933


##########
python/src/iceberg/avro/reader.py:
##########
@@ -192,17 +252,28 @@ class ListReader(Reader):
 
     def read(self, decoder: BinaryDecoder) -> list:
         read_items = []
-        block_count = decoder.read_long()
+        block_count = decoder.read_int()
         while block_count != 0:
             if block_count < 0:
                 block_count = -block_count
-                # We ignore the block size for now
-                _ = decoder.read_long()
+                _ = decoder.read_int()
             for _ in range(block_count):
                 read_items.append(self.element.read(decoder))
-            block_count = decoder.read_long()
+            block_count = decoder.read_int()
         return read_items
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        block_count = decoder.read_int()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                block_size = decoder.read_int()
+                decoder.skip(block_size)
+            else:
+                for _ in range(block_count):
+                    self.element.skip(decoder)
+                block_count = decoder.read_int()

Review Comment:
   Auch, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908141980


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""

Review Comment:
   Yes, that's correct. Added the check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907854465


##########
python/tests/avro/test_resolver.py:
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from iceberg.avro.reader import (
+    DoubleReader,
+    LongReader,
+    MapReader,
+    StringReader,
+    StructReader,
+)
+from iceberg.avro.resolver import ResolveException, resolve
+from iceberg.schema import Schema
+from iceberg.types import (
+    DoubleType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    NestedField,
+    StringType,
+    StructType,
+)
+
+
+def test_resolver():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType()),
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(1, "id", LongType()),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_tree = resolve(write_schema, read_schema)
+
+    assert read_tree == StructReader(
+        (
+            (1, LongReader()),
+            (None, StringReader()),
+            (
+                0,
+                StructReader(
+                    (
+                        (0, DoubleReader()),
+                        (1, DoubleReader()),
+                    )
+                ),
+            ),
+            (2, MapReader(StringReader(), StringReader())),
+        )
+    )
+
+
+def test_resolver_new_required_field():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType(), required=True),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert "2: data: optional string is in not in the write schema, and is required" in str(exc_info.value)
+
+
+def test_resolver_invalid_evolution():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "id", IntegerType()),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert "Promotion from int to long is not allowed" in str(exc_info.value)

Review Comment:
   It would be good to test all of the primitive type promotion cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907848106


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""

Review Comment:
   Does this need to check that `isinstance(read_struct, StructType)` like the list and map functions do?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908093825


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):

Review Comment:
   Yes, I think `IcebergType` would be better as the second argument, let me update that. The first one is guaranteed because that's what singledispatch gives us. We could have a write schema:
   ```
   read schema {
     100: location struct<101: lat double, 102: long double>
   }
   ```
   and a read schema:
   ```
   read schema {
     100: long
   }
   ```
   I wanted to have a nicer error in this case than just an error that `key` can't be found (because a `LongType` doesn't have a key property).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908912655


##########
python/tests/avro/test_resolver.py:
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest

Review Comment:
   I've been looking into this, and am happy to replicate the tests on the Python side. However, it depends on the `writeAndRead` method that requires a write path. Maybe an idea to postpone this until we have the write path (which I see already glooming in the not-so-distant future).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907854717


##########
python/tests/avro/test_decoder.py:
##########
@@ -33,10 +33,10 @@ def test_read_decimal_from_fixed():
     assert actual == expected
 
 
-def test_read_long():
+def test_read_int():
     mis = MemoryInputStream(b"\x18")
     decoder = BinaryDecoder(mis)
-    assert decoder.read_long() == 12
+    assert decoder.read_int() == 12

Review Comment:
   Do you want to add a few tests for the new `skip` methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908150747


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}

Review Comment:
   Great suggestion and I like that a lot. Just updated the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908633893


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},

Review Comment:
   Hashing in this case is documented in the Iceberg spec: https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
   
   That's the one I'm talking about, and we have verified that it is the same between Java and Python.
   
   I also want to note that the type promotions that we care about are the promotions that are allowed in Iceberg, not Avro. While we're using Avro files, Avro is more permissive in these cases than Iceberg allows.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907835942


##########
python/src/iceberg/avro/reader.py:
##########
@@ -177,13 +222,28 @@ def read(self, decoder: BinaryDecoder) -> Any | None:
             return self.option.read(decoder)
         return None
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        if decoder.read_int() > 0:
+            return self.option.skip(decoder)
+
 
 @dataclass(frozen=True)
 class StructReader(Reader):
-    fields: tuple[Reader, ...] = dataclassfield()
+    fields: tuple[tuple[int | None, Reader], ...] = dataclassfield()
 
     def read(self, decoder: BinaryDecoder) -> AvroStruct:
-        return AvroStruct([field.read(decoder) for field in self.fields])
+        result: list[Any | StructProtocol] = [object] * len(self.fields)

Review Comment:
   What is `object` here? If it's just a placeholder, wouldn't it be better to use `None`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907845104


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},

Review Comment:
   I don't think this is currently allowed by the spec, but I don't see a reason why it shouldn't be. Maybe we should add it to the spec? What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908796255


##########
python/src/iceberg/avro/reader.py:
##########
@@ -76,66 +76,102 @@ class Reader(Singleton):
     def read(self, decoder: BinaryDecoder) -> Any:
         ...
 
+    @abstractmethod
+    def skip(self, decoder: BinaryDecoder) -> None:
+        ...
+
 
 class NoneReader(Reader):
     def read(self, _: BinaryDecoder) -> None:
         return None
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        return None
+
 
 class BooleanReader(Reader):
     def read(self, decoder: BinaryDecoder) -> bool:
         return decoder.read_boolean()
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        decoder.skip_boolean()
+
 
 class IntegerReader(Reader):
     def read(self, decoder: BinaryDecoder) -> int:
         return decoder.read_int()
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        decoder.skip_int()
 
-class LongReader(Reader):
-    def read(self, decoder: BinaryDecoder) -> int:
-        return decoder.read_long()
+
+class LongReader(IntegerReader):
+    """Longs and ints are encoded the same way, and there is no long in Python"""

Review Comment:
   Less is more, just removed it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r909219975


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    List,
+    Optional,
+    Tuple,
+    Union,
+)
+
+from iceberg.avro.reader import (
+    CastReader,
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    DecimalType,
+    DoubleType,
+    FloatType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
+    """This resolves the file and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         file_schema (Schema | IcebergType): The schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+
+
+@resolve.register(Schema)
+def _(file_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(file_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(file_struct: StructType, read_struct: IcebergType) -> Reader:
+    """Iterates over the file schema, and checks if the field is in the read schema"""
+
+    if not isinstance(read_struct, StructType):
+        raise ResolveException(f"File/read schema are not aligned for {file_struct}, got {read_struct}")
+
+    results: List[Tuple[Optional[int], Reader]] = []
+    read_fields = {field.field_id: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for file_field in file_struct.fields:
+        if file_field.field_id in read_fields:
+            read_pos, read_field = read_fields[file_field.field_id]
+            result_reader = resolve(file_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(file_field.field_type, ConstructReader())
+        result_reader = result_reader if file_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    file_fields = {field.field_id: field for field in file_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.field_id not in file_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is non-optional, and not part of the file schema")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(file_list: ListType, read_list: IcebergType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"File/read schema are not aligned for {file_list}, got {read_list}")
+    element_reader = resolve(file_list.element_type, read_list.element_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(file_map: MapType, read_map: IcebergType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"File/read schema are not aligned for {file_map}, got {read_map}")
+    key_reader = resolve(file_map.key_type, read_map.key_type)
+    value_reader = resolve(file_map.value_type, read_map.value_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+@resolve.register(PrimitiveType)
+def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
+    """Converting the primitive type into an actual reader that will decode the physical data"""
+    if not isinstance(read_type, PrimitiveType):
+        raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+    # In the case of a promotion, we want to check if it is valid
+    if file_type != read_type:
+        return promote(file_type, read_type)
+    return primitive_reader(read_type)
+
+
+@singledispatch
+def promote(file_type: IcebergType, read_type: IcebergType) -> Reader:
+    """Promotes reading a file type to a read type
+
+    Args:
+        file_type (IcebergType): The type of the Avro file
+        read_type (IcebergType): The requested read type
+
+    Raises:
+        ResolveException: If attempting to resolve an unrecognized object type
+    """
+    raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+
+@promote.register(IntegerType)
+def _(file_type: IntegerType, read_type: IcebergType) -> Reader:
+    if isinstance(read_type, LongType):
+        # Ints/Longs are binary compatible in Avro, so this is okay
+        return primitive_reader(read_type)
+    elif type(read_type) in {FloatType, DoubleType}:
+        # We should just read the int, and convert it to a float
+        return CastReader(primitive_reader(file_type), float)
+    else:
+        raise ResolveException(f"Cannot promote an int to {read_type}")
+
+
+@promote.register(LongType)
+def _(file_type: LongType, read_type: IcebergType) -> Reader:
+    if type(read_type) in {FloatType, DoubleType}:

Review Comment:
   And they are gone!



##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    List,
+    Optional,
+    Tuple,
+    Union,
+)
+
+from iceberg.avro.reader import (
+    CastReader,
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    DecimalType,
+    DoubleType,
+    FloatType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader:
+    """This resolves the file and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         file_schema (Schema | IcebergType): The schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+
+
+@resolve.register(Schema)
+def _(file_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(file_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(file_struct: StructType, read_struct: IcebergType) -> Reader:
+    """Iterates over the file schema, and checks if the field is in the read schema"""
+
+    if not isinstance(read_struct, StructType):
+        raise ResolveException(f"File/read schema are not aligned for {file_struct}, got {read_struct}")
+
+    results: List[Tuple[Optional[int], Reader]] = []
+    read_fields = {field.field_id: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for file_field in file_struct.fields:
+        if file_field.field_id in read_fields:
+            read_pos, read_field = read_fields[file_field.field_id]
+            result_reader = resolve(file_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(file_field.field_type, ConstructReader())
+        result_reader = result_reader if file_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    file_fields = {field.field_id: field for field in file_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.field_id not in file_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is non-optional, and not part of the file schema")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(file_list: ListType, read_list: IcebergType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"File/read schema are not aligned for {file_list}, got {read_list}")
+    element_reader = resolve(file_list.element_type, read_list.element_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(file_map: MapType, read_map: IcebergType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"File/read schema are not aligned for {file_map}, got {read_map}")
+    key_reader = resolve(file_map.key_type, read_map.key_type)
+    value_reader = resolve(file_map.value_type, read_map.value_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+@resolve.register(PrimitiveType)
+def _(file_type: PrimitiveType, read_type: IcebergType) -> Reader:
+    """Converting the primitive type into an actual reader that will decode the physical data"""
+    if not isinstance(read_type, PrimitiveType):
+        raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+    # In the case of a promotion, we want to check if it is valid
+    if file_type != read_type:
+        return promote(file_type, read_type)
+    return primitive_reader(read_type)
+
+
+@singledispatch
+def promote(file_type: IcebergType, read_type: IcebergType) -> Reader:
+    """Promotes reading a file type to a read type
+
+    Args:
+        file_type (IcebergType): The type of the Avro file
+        read_type (IcebergType): The requested read type
+
+    Raises:
+        ResolveException: If attempting to resolve an unrecognized object type
+    """
+    raise ResolveException(f"Cannot promote {file_type} to {read_type}")
+
+
+@promote.register(IntegerType)
+def _(file_type: IntegerType, read_type: IcebergType) -> Reader:
+    if isinstance(read_type, LongType):
+        # Ints/Longs are binary compatible in Avro, so this is okay
+        return primitive_reader(read_type)
+    elif type(read_type) in {FloatType, DoubleType}:

Review Comment:
   And they are gone!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908049734


##########
python/src/iceberg/avro/reader.py:
##########
@@ -76,66 +76,102 @@ class Reader(Singleton):
     def read(self, decoder: BinaryDecoder) -> Any:
         ...
 
+    @abstractmethod
+    def skip(self, decoder: BinaryDecoder) -> None:
+        ...
+
 
 class NoneReader(Reader):
     def read(self, _: BinaryDecoder) -> None:
         return None
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        return None
+
 
 class BooleanReader(Reader):
     def read(self, decoder: BinaryDecoder) -> bool:
         return decoder.read_boolean()
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        decoder.skip_boolean()
+
 
 class IntegerReader(Reader):
     def read(self, decoder: BinaryDecoder) -> int:
         return decoder.read_int()
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        decoder.skip_int()
 
-class LongReader(Reader):
-    def read(self, decoder: BinaryDecoder) -> int:
-        return decoder.read_long()
+
+class LongReader(IntegerReader):
+    """Longs and ints are encoded the same way, and there is no long in Python"""

Review Comment:
   In Python we don't have a distinction between int/long, but in Avro we do, therefore I left it in. I'm also totally fine with removing it here as well 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908579452


##########
python/tests/avro/test_resolver.py:
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from iceberg.avro.reader import (
+    DoubleReader,
+    LongReader,
+    MapReader,
+    StringReader,
+    StructReader,
+)
+from iceberg.avro.resolver import ResolveException, resolve
+from iceberg.schema import Schema
+from iceberg.types import (
+    DoubleType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    NestedField,
+    StringType,
+    StructType,
+)
+
+
+def test_resolver():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType()),
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(1, "id", LongType()),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_tree = resolve(write_schema, read_schema)
+
+    assert read_tree == StructReader(
+        (
+            (1, LongReader()),
+            (None, StringReader()),
+            (
+                0,
+                StructReader(
+                    (
+                        (0, DoubleReader()),
+                        (1, DoubleReader()),
+                    )
+                ),
+            ),
+            (2, MapReader(StringReader(), StringReader())),
+        )
+    )
+
+
+def test_resolver_new_required_field():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType(), required=True),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert "2: data: optional string is in not in the write schema, and is required" in str(exc_info.value)
+
+
+def test_resolver_invalid_evolution():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "id", IntegerType()),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert "Promotion from int to long is not allowed" in str(exc_info.value)

Review Comment:
   Thanks, that just uncovered a bug 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908581560


##########
python/tests/avro/test_resolver.py:
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from iceberg.avro.reader import (
+    DoubleReader,
+    LongReader,
+    MapReader,
+    StringReader,
+    StructReader,
+)
+from iceberg.avro.resolver import ResolveException, resolve
+from iceberg.schema import Schema
+from iceberg.types import (
+    DoubleType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    NestedField,
+    StringType,
+    StructType,
+)
+
+
+def test_resolver():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType()),
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(1, "id", LongType()),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_tree = resolve(write_schema, read_schema)
+
+    assert read_tree == StructReader(
+        (
+            (1, LongReader()),
+            (None, StringReader()),
+            (
+                0,
+                StructReader(
+                    (
+                        (0, DoubleReader()),
+                        (1, DoubleReader()),
+                    )
+                ),
+            ),
+            (2, MapReader(StringReader(), StringReader())),
+        )
+    )
+
+
+def test_resolver_new_required_field():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType(), required=True),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert "2: data: optional string is in not in the write schema, and is required" in str(exc_info.value)

Review Comment:
   This is awkward and has been fixed in https://github.com/apache/iceberg/pull/5011 as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908630143


##########
python/src/iceberg/avro/reader.py:
##########
@@ -76,66 +76,102 @@ class Reader(Singleton):
     def read(self, decoder: BinaryDecoder) -> Any:
         ...
 
+    @abstractmethod
+    def skip(self, decoder: BinaryDecoder) -> None:
+        ...
+
 
 class NoneReader(Reader):
     def read(self, _: BinaryDecoder) -> None:
         return None
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        return None
+
 
 class BooleanReader(Reader):
     def read(self, decoder: BinaryDecoder) -> bool:
         return decoder.read_boolean()
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        decoder.skip_boolean()
+
 
 class IntegerReader(Reader):
     def read(self, decoder: BinaryDecoder) -> int:
         return decoder.read_int()
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        decoder.skip_int()
 
-class LongReader(Reader):
-    def read(self, decoder: BinaryDecoder) -> int:
-        return decoder.read_long()
+
+class LongReader(IntegerReader):
+    """Longs and ints are encoded the same way, and there is no long in Python"""

Review Comment:
   I'd probably remove it to keep things simple, but it's up to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907858442


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},

Review Comment:
   For background, one of the restrictions that we place on type promotion is that promotion is only allowed if hashing the value produces the same result. That's why `hashInt(v)` is implemented as `hashLong(castToLong(v))`. Otherwise, when a table column is promoted from `int` to `long`, any metadata values for `bucket` partitions would be suddenly incorrect.
   
   It should be possible to promote from `binary` to `string` (or the opposite) because the hash value is the same.
   
   I think we can also relax this constraint a bit, so if there is no bucket transform on a column, you can perform a type promotion that would not be allowed otherwise. For example, int to string promotion could only be allowed if there was no partition spec with a bucket function on the int column.
   
   There are also odd cases with timestamps. If a long value is a timestamp in microseconds, then it could be promoted to `timestamp` or `timestamptz` because the hash function would match (no need to modify the value). But if the long was a timestamp in milliseconds, we could only promote to `timestamp` or `timestamptz` if there was no bucket transform applied to the value in a partition spec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907851879


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")

Review Comment:
   I recommend referring to the "file schema" rather than the "write" or "writer" schema. I've found that makes the most sense to people when reading error messages because they know where it's coming from "write schema" sounds like they're writing something, which can be confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907859056


##########
python/tests/avro/test_resolver.py:
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from iceberg.avro.reader import (
+    DoubleReader,
+    LongReader,
+    MapReader,
+    StringReader,
+    StructReader,
+)
+from iceberg.avro.resolver import ResolveException, resolve
+from iceberg.schema import Schema
+from iceberg.types import (
+    DoubleType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    NestedField,
+    StringType,
+    StructType,
+)
+
+
+def test_resolver():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType()),
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(1, "id", LongType()),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_tree = resolve(write_schema, read_schema)
+
+    assert read_tree == StructReader(
+        (
+            (1, LongReader()),
+            (None, StringReader()),
+            (
+                0,
+                StructReader(
+                    (
+                        (0, DoubleReader()),
+                        (1, DoubleReader()),
+                    )
+                ),
+            ),
+            (2, MapReader(StringReader(), StringReader())),
+        )
+    )
+
+
+def test_resolver_new_required_field():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType(), required=True),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert "2: data: optional string is in not in the write schema, and is required" in str(exc_info.value)
+
+
+def test_resolver_invalid_evolution():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "id", IntegerType()),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert "Promotion from int to long is not allowed" in str(exc_info.value)
+
+
+def test_resolver_change_type():
+    write_schema = Schema(
+        NestedField(1, "properties", ListType(2, StringType())),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "properties", MapType(2, StringType(), 3, StringType())),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert (
+        "Cannot change list<string> into MapType(key_id=2, key_type=StringType(), value_id=3, value_type=StringType(), value_required=True)"

Review Comment:
   Looks like this uses `str(write_type)` but `repr(read_type)`. Could you use `str` in both cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907838087


##########
python/src/iceberg/avro/reader.py:
##########
@@ -211,26 +282,39 @@ class MapReader(Reader):
 
     def read(self, decoder: BinaryDecoder) -> dict:
         read_items = {}
-        block_count = decoder.read_long()
+        block_count = decoder.read_int()
         while block_count != 0:
             if block_count < 0:
                 block_count = -block_count
                 # We ignore the block size for now
-                _ = decoder.read_long()
+                _ = decoder.read_int()
             for _ in range(block_count):
                 key = self.key.read(decoder)
                 read_items[key] = self.value.read(decoder)
-            block_count = decoder.read_long()
+            block_count = decoder.read_int()
 
         return read_items
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        block_count = decoder.read_int()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                block_size = decoder.read_int()
+                decoder.skip(block_size)
+            else:
+                for _ in range(block_count):
+                    self.key.skip(decoder)
+                    self.value.skip(decoder)
+                block_count = decoder.read_int()

Review Comment:
   Same here.
   
   Can `MapReader` and `ListReader` be refactored to use the same `skip` method? Not a big deal to copy this twice, but that would cut down on inconsistencies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908953001


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},

Review Comment:
   I've created a PR here to add string/binary conversion to the Spec: https://github.com/apache/iceberg/pull/5151
   
   > I also want to note that the type promotions that we care about are the promotions that are allowed in Iceberg, not Avro. While we're using Avro files, Avro is more permissive in these cases than Iceberg allows.
   
   Good point, I was focussing on the Avro ones here (since it is in the Avro package), but we can also make this pluggable by providing different promotion dictionaries for Avro and Iceberg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907845781


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},
+    # These are all allowed according to the Avro spec
+    # IntegerType: {LongType, FloatType, DoubleType},
+    # LongType: {FloatType, DoubleType},
+    # FloatType: {DoubleType},

Review Comment:
   This is allowed by the Iceberg spec, as is promotion from `DecimalType(P, S)` to `DecimalType(P2, S)` where `P2 > P`.
   
   Promoting `float` to `double` should be possible fairly easily. All you have to do is check whether it's allowed and then use the source type's reader. Since Python uses a double to represent both, it will automatically produce the correct result.
   
   Same thing for `DecimalType`. As long as you're reading the correct fixed size you should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r909053946


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},
+    # These are all allowed according to the Avro spec
+    # IntegerType: {LongType, FloatType, DoubleType},
+    # LongType: {FloatType, DoubleType},
+    # FloatType: {DoubleType},

Review Comment:
   Ah, you already mentioned that =) I've updated the code by converting the `dict` into a promote `singledispatch` which gives us a bit more flexibility. For the float/double case, it will just return the file type reader. For the decimal, it will check the precision, and for converting the int to a float, it will do the conversion. It comes with tests. Let me know what you think!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907845781


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},
+    # These are all allowed according to the Avro spec
+    # IntegerType: {LongType, FloatType, DoubleType},
+    # LongType: {FloatType, DoubleType},
+    # FloatType: {DoubleType},

Review Comment:
   This is allowed by the Iceberg spec, as is promotion from `DoubleType(P, S)` to `DoubleType(P2, S)` where `P2 > P`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907854101


##########
python/tests/avro/test_resolver.py:
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from iceberg.avro.reader import (
+    DoubleReader,
+    LongReader,
+    MapReader,
+    StringReader,
+    StructReader,
+)
+from iceberg.avro.resolver import ResolveException, resolve
+from iceberg.schema import Schema
+from iceberg.types import (
+    DoubleType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    NestedField,
+    StringType,
+    StructType,
+)
+
+
+def test_resolver():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType()),
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(1, "id", LongType()),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_tree = resolve(write_schema, read_schema)
+
+    assert read_tree == StructReader(
+        (
+            (1, LongReader()),
+            (None, StringReader()),
+            (
+                0,
+                StructReader(
+                    (
+                        (0, DoubleReader()),
+                        (1, DoubleReader()),
+                    )
+                ),
+            ),
+            (2, MapReader(StringReader(), StringReader())),
+        )
+    )
+
+
+def test_resolver_new_required_field():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType(), required=True),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert "2: data: optional string is in not in the write schema, and is required" in str(exc_info.value)

Review Comment:
   Why does this say "2: data: optional string ... is required"? Should it say "required string" instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908085208


##########
python/src/iceberg/avro/reader.py:
##########
@@ -211,26 +282,39 @@ class MapReader(Reader):
 
     def read(self, decoder: BinaryDecoder) -> dict:
         read_items = {}
-        block_count = decoder.read_long()
+        block_count = decoder.read_int()
         while block_count != 0:
             if block_count < 0:
                 block_count = -block_count
                 # We ignore the block size for now
-                _ = decoder.read_long()
+                _ = decoder.read_int()
             for _ in range(block_count):
                 key = self.key.read(decoder)
                 read_items[key] = self.value.read(decoder)
-            block_count = decoder.read_long()
+            block_count = decoder.read_int()
 
         return read_items
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        block_count = decoder.read_int()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                block_size = decoder.read_int()
+                decoder.skip(block_size)
+            else:
+                for _ in range(block_count):
+                    self.key.skip(decoder)
+                    self.value.skip(decoder)
+                block_count = decoder.read_int()

Review Comment:
   I've refactored this to a function:
   ```python
   def _skip_map_array(decoder: BinaryDecoder, skip_entry: Callable) -> None:
       """Skips over an array or map
   
       Both the array and map are encoded similar, and we can re-use
       the logic of skipping in an efficient way.
   
       From the Avro spec:
   
       Maps (and arrays) are encoded as a series of blocks.
       Each block consists of a long count value, followed by that many key/value pairs in the case of a map,
       and followed by that many array items in the case of an array. A block with count zero indicates the
       end of the map. Each item is encoded per the map's value schema.
   
       If a block's count is negative, its absolute value is used, and the count is followed immediately by a
       long block size indicating the number of bytes in the block. This block size permits fast skipping
       through data, e.g., when projecting a record to a subset of its fields.
   
       Args:
           decoder:
               The decoder that reads the types from the underlying data
           skip_entry:
               Function to skip over the underlying data, element in case of an array, and the
               key/value in the case of a map
       """
       block_count = decoder.read_int()
       while block_count != 0:
           if block_count < 0:
               # The length in bytes in encoded, so we can skip over it right away
               block_size = decoder.read_int()
               decoder.skip(block_size)
           else:
               [skip_entry() for _ in range(block_count)]
           block_count = decoder.read_int()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907837529


##########
python/src/iceberg/avro/reader.py:
##########
@@ -192,17 +252,28 @@ class ListReader(Reader):
 
     def read(self, decoder: BinaryDecoder) -> list:
         read_items = []
-        block_count = decoder.read_long()
+        block_count = decoder.read_int()
         while block_count != 0:
             if block_count < 0:
                 block_count = -block_count
-                # We ignore the block size for now
-                _ = decoder.read_long()
+                _ = decoder.read_int()
             for _ in range(block_count):
                 read_items.append(self.element.read(decoder))
-            block_count = decoder.read_long()
+            block_count = decoder.read_int()
         return read_items
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        block_count = decoder.read_int()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                block_size = decoder.read_int()
+                decoder.skip(block_size)
+            else:
+                for _ in range(block_count):
+                    self.element.skip(decoder)
+                block_count = decoder.read_int()

Review Comment:
   I think this should be aligned with the `if` / `else`. Otherwise, the `if` case doesn't read the next `block_count` and is an infinite loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r907834948


##########
python/src/iceberg/avro/reader.py:
##########
@@ -76,66 +76,102 @@ class Reader(Singleton):
     def read(self, decoder: BinaryDecoder) -> Any:
         ...
 
+    @abstractmethod
+    def skip(self, decoder: BinaryDecoder) -> None:
+        ...
+
 
 class NoneReader(Reader):
     def read(self, _: BinaryDecoder) -> None:
         return None
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        return None
+
 
 class BooleanReader(Reader):
     def read(self, decoder: BinaryDecoder) -> bool:
         return decoder.read_boolean()
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        decoder.skip_boolean()
+
 
 class IntegerReader(Reader):
     def read(self, decoder: BinaryDecoder) -> int:
         return decoder.read_int()
 
+    def skip(self, decoder: BinaryDecoder) -> None:
+        decoder.skip_int()
 
-class LongReader(Reader):
-    def read(self, decoder: BinaryDecoder) -> int:
-        return decoder.read_long()
+
+class LongReader(IntegerReader):
+    """Longs and ints are encoded the same way, and there is no long in Python"""

Review Comment:
   Are you just leaving this class in so that the reader tree is consistent with the schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908118833


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},

Review Comment:
   I got this from the Avro schema evolution:
   
   ![Screenshot 2022-06-28 at 08 48 18](https://user-images.githubusercontent.com/1134248/176112107-fbe204bc-896a-4c7d-b2a5-b0d43eb8595b.png)
   
   And thanks for the elaborate background information. And I agree that we should add it to the spec indeed. For the hashing, for Python it seems to check out:
   ```python
   ➜  python git:(fd-resolve-write-read-schemas) ✗ python3
   Python 3.9.13 (main, May 24 2022, 21:13:51) 
   [Clang 13.1.6 (clang-1316.0.21.2)] on darwin
   Type "help", "copyright", "credits" or "license" for more information.
   >>> hash("vo")
   292714697147949111
   >>> hash("vo".encode("utf8"))
   292714697147949111
   ```
   
   For Java, this does not seem to be the case:
   ```
   ➜  python git:(fd-resolve-write-read-schemas) ✗ scala
   Welcome to Scala 2.13.8 (OpenJDK 64-Bit Server VM, Java 18.0.1.1).
   Type in expressions for evaluation. Or try :help.
   
   scala> new java.lang.String("vo").hashCode
   val res1: Int = 3769
   
   scala> new java.lang.String("vo").getBytes()
   val res2: Array[Byte] = Array(118, 111)
   
   scala> new java.lang.String("vo").getBytes().hashCode()
   val res3: Int = 1479286669
   ```
   
   We should also make sure that the hashes are the same across languages. I think this is okay for now because we implement this using the MurmurHash library.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908489764


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")

Review Comment:
   I agree 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908493225


##########
python/tests/avro/test_resolver.py:
##########
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+
+from iceberg.avro.reader import (
+    DoubleReader,
+    LongReader,
+    MapReader,
+    StringReader,
+    StructReader,
+)
+from iceberg.avro.resolver import ResolveException, resolve
+from iceberg.schema import Schema
+from iceberg.types import (
+    DoubleType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    NestedField,
+    StringType,
+    StructType,
+)
+
+
+def test_resolver():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType()),
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(
+            3,
+            "location",
+            StructType(
+                NestedField(4, "lat", DoubleType()),
+                NestedField(5, "long", DoubleType()),
+            ),
+        ),
+        NestedField(1, "id", LongType()),
+        NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
+        schema_id=1,
+    )
+    read_tree = resolve(write_schema, read_schema)
+
+    assert read_tree == StructReader(
+        (
+            (1, LongReader()),
+            (None, StringReader()),
+            (
+                0,
+                StructReader(
+                    (
+                        (0, DoubleReader()),
+                        (1, DoubleReader()),
+                    )
+                ),
+            ),
+            (2, MapReader(StringReader(), StringReader())),
+        )
+    )
+
+
+def test_resolver_new_required_field():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "id", LongType()),
+        NestedField(2, "data", StringType(), required=True),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert "2: data: optional string is in not in the write schema, and is required" in str(exc_info.value)
+
+
+def test_resolver_invalid_evolution():
+    write_schema = Schema(
+        NestedField(1, "id", LongType()),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "id", IntegerType()),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert "Promotion from int to long is not allowed" in str(exc_info.value)
+
+
+def test_resolver_change_type():
+    write_schema = Schema(
+        NestedField(1, "properties", ListType(2, StringType())),
+        schema_id=1,
+    )
+    read_schema = Schema(
+        NestedField(1, "properties", MapType(2, StringType(), 3, StringType())),
+        schema_id=1,
+    )
+
+    with pytest.raises(ResolveException) as exc_info:
+        resolve(write_schema, read_schema)
+
+    assert (
+        "Cannot change list<string> into MapType(key_id=2, key_type=StringType(), value_id=3, value_type=StringType(), value_required=True)"

Review Comment:
   This is actually fixed by https://github.com/apache/iceberg/pull/5011. The PR includes many tests around str/repr :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r908635933


##########
python/src/iceberg/avro/resolver.py:
##########
@@ -0,0 +1,150 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from functools import singledispatch
+from typing import (
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+)
+
+from iceberg.avro.reader import (
+    ConstructReader,
+    ListReader,
+    MapReader,
+    NoneReader,
+    OptionReader,
+    Reader,
+    StructReader,
+    primitive_reader,
+)
+from iceberg.schema import Schema, visit
+from iceberg.types import (
+    BinaryType,
+    IcebergType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    PrimitiveType,
+    StringType,
+    StructType,
+)
+
+
+class ResolveException(Exception):
+    pass
+
+
+@singledispatch
+def resolve(write_schema, read_schema) -> Reader:
+    """This resolves the write and read schema
+
+    The function traverses the schema in post-order fashion
+
+     Args:
+         write_schema (Schema | IcebergType): The write schema of the Avro file
+         read_schema (Schema | IcebergType): The requested read schema which is equal or a subset of the write schema
+
+     Raises:
+         NotImplementedError: If attempting to resolve an unrecognized object type
+    """
+    raise NotImplementedError("Cannot resolve non-type: %s" % write_schema)
+
+
+@resolve.register(Schema)
+def _(write_schema: Schema, read_schema: Schema) -> Reader:
+    """Visit a Schema and starts resolving it by converting it to a struct"""
+    return resolve(write_schema.as_struct(), read_schema.as_struct())
+
+
+@resolve.register(StructType)
+def _(write_struct: StructType, read_struct: StructType) -> Reader:
+    """Iterates over the write schema, and checks if the field is in the read schema"""
+    results: List[Tuple[Optional[int], Reader]] = []
+
+    read_fields = {field.name: (pos, field) for pos, field in enumerate(read_struct.fields)}
+
+    for write_field in write_struct.fields:
+        if write_field.name in read_fields:
+            read_pos, read_field = read_fields[write_field.name]
+            result_reader = resolve(write_field.field_type, read_field.field_type)
+        else:
+            read_pos = None
+            result_reader = visit(write_field.field_type, ConstructReader())
+        result_reader = result_reader if write_field.required else OptionReader(result_reader)
+        results.append((read_pos, result_reader))
+
+    write_fields = {field.name: field for field in write_struct.fields}
+    for pos, read_field in enumerate(read_struct.fields):
+        if read_field.name not in write_fields:
+            if read_field.required:
+                raise ResolveException(f"{read_field} is in not in the write schema, and is required")
+            # Just set the new field to None
+            results.append((pos, NoneReader()))
+
+    return StructReader(tuple(results))
+
+
+@resolve.register(ListType)
+def _(write_list: ListType, read_list: ListType) -> Reader:
+    if not isinstance(read_list, ListType):
+        raise ResolveException(f"Cannot change {write_list} into {read_list}")
+    element_reader = resolve(write_list.element.field_type, read_list.element.field_type)
+    return ListReader(element_reader)
+
+
+@resolve.register(MapType)
+def _(write_map: MapType, read_map: MapType) -> Reader:
+    if not isinstance(read_map, MapType):
+        raise ResolveException(f"Cannot change {write_map} into {read_map}")
+    key_reader = resolve(write_map.key.field_type, read_map.key.field_type)
+    value_reader = resolve(write_map.value.field_type, read_map.value.field_type)
+
+    return MapReader(key_reader, value_reader)
+
+
+ALLOWED_PROMOTIONS: Dict[Type[PrimitiveType], Set[Type[PrimitiveType]]] = {
+    # For now we only support the binary compatible ones
+    IntegerType: {LongType},
+    StringType: {BinaryType},
+    BinaryType: {StringType},
+    # These are all allowed according to the Avro spec
+    # IntegerType: {LongType, FloatType, DoubleType},
+    # LongType: {FloatType, DoubleType},
+    # FloatType: {DoubleType},

Review Comment:
   Yes, that's correct. We would need to still use the reader that corresponds to the write schema. It should be possible to do that in the `PrimitiveType` function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5116: Python: Resolve write/read schemas

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5116:
URL: https://github.com/apache/iceberg/pull/5116#discussion_r909920955


##########
python/src/iceberg/exceptions.py:
##########
@@ -33,4 +33,4 @@ class AlreadyExistsError(Exception):
 
 
 class ValidationError(Exception):
-    ...
+    """Raises when there is an issue with the schema"""

Review Comment:
   This is used in other places as well, but the comment is fine for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org