You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/04 05:56:01 UTC

[GitHub] [iceberg] samredai commented on a diff in pull request #4920: Python: Add Avro read path

samredai commented on code in PR #4920:
URL: https://github.com/apache/iceberg/pull/4920#discussion_r889453366


##########
python/src/iceberg/avro/codec.py:
##########
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+"""
+Contains Codecs for Python Avro.
+
+Note that the word "codecs" means "compression/decompression algorithms" in the
+Avro world (https://avro.apache.org/docs/current/spec.html#Object+Container+Files),
+so don't confuse it with the Python's "codecs", which is a package mainly for
+converting character sets (https://docs.python.org/3/library/codecs.html).
+"""
+
+import abc
+import binascii
+import io
+import struct
+import zlib
+from typing import Dict, Tuple, Type
+
+from iceberg.avro.decoder import BinaryDecoder
+from iceberg.io.memory import MemoryInputStream
+
+STRUCT_CRC32 = struct.Struct(">I")  # big-endian unsigned int
+
+
+def _check_crc32(bytes_: bytes, checksum: bytes) -> None:
+    if binascii.crc32(bytes_) & 0xFFFFFFFF != STRUCT_CRC32.unpack(checksum)[0]:
+        raise ValueError("Checksum failure")
+
+
+# bzip2
+try:
+    import bz2
+
+    has_bzip2 = True
+except ImportError:
+    has_bzip2 = False
+
+# snappy
+try:
+    import snappy
+
+    has_snappy = True
+except ImportError:

Review Comment:
   Should this just be `except Exception`? In other words if `import snappy` fails for some odd reason other than `ImportError`, would it make sense to just proceed without snappy? If we go that route, some info logging here would be useful.



##########
python/tests/utils/test_schema_conversion.py:
##########
@@ -229,3 +234,127 @@ def test_avro_list_required_record():
     iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema)
 
     assert expected_iceberg_schema == iceberg_schema
+
+
+def test_resolve_union():
+    with pytest.raises(TypeError) as exc_info:
+        AvroSchemaConversion()._resolve_union(["null", "string", "long"])
+
+    assert "Non-optional types aren't part of the Iceberg specification" in str(exc_info.value)
+
+
+def test_nested_type():
+    # In the case a primitive field is nested
+    assert AvroSchemaConversion()._convert_schema({"type": {"type": "string"}}) == StringType()
+
+
+def test_map_type():
+    avro_type = {
+        "type": "map",
+        "values": ["long", "null"],
+        "key-id": 101,
+        "value-id": 102,
+    }
+    actual = AvroSchemaConversion()._convert_schema(avro_type)
+    expected = MapType(key_id=101, key_type=StringType(), value_id=102, value_type=LongType(), value_is_optional=True)
+    assert actual == expected
+
+
+def test_fixed_type():
+    avro_type = {"type": "fixed", "size": 22}
+    actual = AvroSchemaConversion()._convert_schema(avro_type)
+    expected = FixedType(22)
+    assert actual == expected
+
+
+def test_unknown_primitive():
+    with pytest.raises(TypeError) as exc_info:
+        avro_type = "UnknownType"
+        AvroSchemaConversion()._convert_schema(avro_type)
+    assert "Unknown type: UnknownType" in str(exc_info.value)
+
+
+def test_unknown_complex_type():
+    with pytest.raises(TypeError) as exc_info:
+        avro_type = {
+            "type": "UnknownType",
+        }
+        AvroSchemaConversion()._convert_schema(avro_type)
+    assert "Unknown type: {'type': 'UnknownType'}" in str(exc_info.value)
+
+
+def test_convert_field_without_field_id():
+    with pytest.raises(ValueError) as exc_info:
+        avro_field = {
+            "name": "contains_null",
+            "type": "boolean",
+        }
+        AvroSchemaConversion()._convert_field(avro_field)
+    assert "Cannot convert field, missing field-id" in str(exc_info.value)
+
+
+def test_convert_record_type_without_record():
+    with pytest.raises(ValueError) as exc_info:
+        avro_field = {"type": "non-record", "name": "avro_schema", "fields": []}
+        AvroSchemaConversion()._convert_record_type(avro_field)
+    assert "Expected record type, got" in str(exc_info.value)
+
+
+def test_avro_list_missing_element_id():
+    avro_type = {
+        "name": "array_with_string",
+        "type": {
+            "type": "array",
+            "items": "string",
+            "default": [],
+            # "element-id": 101,
+        },
+        "field-id": 100,
+    }
+
+    with pytest.raises(ValueError) as exc_info:
+        AvroSchemaConversion()._convert_array_type(avro_type)
+
+    assert "Cannot convert array-type, missing element-id:" in str(exc_info.value)
+
+
+def test_convert_decimal_type():
+    avro_decimal_type = {"type": "bytes", "logicalType": "decimal", "precision": 19, "scale": 25}
+    actual = AvroSchemaConversion()._convert_logical_type(avro_decimal_type)
+    expected = DecimalType(precision=19, scale=25)
+    assert actual == expected
+
+
+def test_convert_date_type():
+    avro_logical_type = {"type": "int", "logicalType": "date"}
+    actual = AvroSchemaConversion()._convert_logical_type(avro_logical_type)
+    assert actual == DateType()
+
+
+def test_unknown_logical_type():

Review Comment:
   nit: One-liner docstrings may be helpful, especially since they'll come up in IDE searches. Here for example I'd add something like:
   ```py
   def test_unknown_logical_type():
       """Test raising a ValueError when converting an unknown logical type as part of an Avro schema conversion"""
       ...
   ```



##########
python/src/iceberg/avro/codec.py:
##########
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+"""
+Contains Codecs for Python Avro.
+
+Note that the word "codecs" means "compression/decompression algorithms" in the
+Avro world (https://avro.apache.org/docs/current/spec.html#Object+Container+Files),
+so don't confuse it with the Python's "codecs", which is a package mainly for
+converting character sets (https://docs.python.org/3/library/codecs.html).
+"""
+
+import abc
+import binascii
+import io
+import struct
+import zlib
+from typing import Dict, Tuple, Type
+
+from iceberg.avro.decoder import BinaryDecoder
+from iceberg.io.memory import MemoryInputStream
+
+STRUCT_CRC32 = struct.Struct(">I")  # big-endian unsigned int
+
+
+def _check_crc32(bytes_: bytes, checksum: bytes) -> None:
+    if binascii.crc32(bytes_) & 0xFFFFFFFF != STRUCT_CRC32.unpack(checksum)[0]:
+        raise ValueError("Checksum failure")
+
+
+# bzip2
+try:
+    import bz2
+
+    has_bzip2 = True
+except ImportError:
+    has_bzip2 = False
+
+# snappy
+try:
+    import snappy
+
+    has_snappy = True
+except ImportError:
+    has_snappy = False
+
+# z-standard

Review Comment:
   nit: Are these comments needed?



##########
python/src/iceberg/avro/reader.py:
##########
@@ -0,0 +1,284 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=W0621
+"""
+Avro reader for reading Avro files
+"""
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass, field
+from io import SEEK_SET
+from typing import (
+    Any,
+    Dict,
+    List,
+    Type,
+    Union,
+)
+
+from iceberg.avro.codec import KNOWN_CODECS, Codec, NullCodec
+from iceberg.avro.decoder import BinaryDecoder
+from iceberg.files import StructProtocol
+from iceberg.io.base import InputFile, InputStream
+from iceberg.schema import Schema, SchemaVisitor, visit
+from iceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
+    DoubleType,
+    FixedType,
+    FloatType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    NestedField,
+    PrimitiveType,
+    StringType,
+    StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+)
+from iceberg.utils.schema_conversion import AvroSchemaConversion
+
+
+@dataclass(frozen=True)
+class AvroStructProtocol(StructProtocol):
+    _data: List[Union[Any, StructProtocol]] = field(default_factory=list)
+
+    def set(self, pos: int, value: Any) -> None:
+        self._data[pos] = value
+
+    def get(self, pos: int) -> Any:
+        return self._data[pos]
+
+
+class _AvroReader(SchemaVisitor[Union[AvroStructProtocol, Any]]):
+    _skip: bool = False
+
+    def __init__(self, decoder: BinaryDecoder):
+        self._decoder = decoder
+
+    def schema(self, schema: Schema, struct_result: Union[AvroStructProtocol, Any]) -> Union[AvroStructProtocol, Any]:
+        return struct_result
+
+    def struct(self, struct: StructType, field_results: List[Union[AvroStructProtocol, Any]]) -> Union[AvroStructProtocol, Any]:
+        return AvroStructProtocol(field_results)
+
+    def before_field(self, field: NestedField) -> None:
+        if field.is_optional:
+            pos = self._decoder.read_long()
+            # We now assume that null is first (which is often the case)
+            if int(pos) == 0:
+                self._skip = True
+
+    def field(self, field: NestedField, field_result: Union[AvroStructProtocol, Any]) -> Union[AvroStructProtocol, Any]:
+        return field_result
+
+    def before_list_element(self, element: NestedField) -> None:
+        self._skip = True
+
+    def list(self, list_type: ListType, element_result: Union[AvroStructProtocol, Any]) -> Union[AvroStructProtocol, Any]:
+        read_items = []
+        block_count = self._decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                # We ignore the block size for now
+                _ = self._decoder.read_long()
+            for _ in range(block_count):
+                read_items.append(visit(list_type.element_type, self))
+            block_count = self._decoder.read_long()
+        return read_items
+
+    def before_map_key(self, key: NestedField) -> None:
+        self._skip = True
+
+    def before_map_value(self, value: NestedField) -> None:
+        self._skip = True
+
+    def map(
+        self, map_type: MapType, key_result: Union[AvroStructProtocol, Any], value_result: Union[AvroStructProtocol, Any]
+    ) -> Union[AvroStructProtocol, Any]:
+        read_items = {}
+
+        block_count = self._decoder.read_long()
+        if block_count < 0:
+            block_count = -block_count
+            # We ignore the block size for now
+            _ = self._decoder.read_long()
+
+        # The Iceberg non-string implementation with an array of records:
+        while block_count != 0:
+            for _ in range(block_count):
+                key = visit(map_type.key_type, self)
+                read_items[key] = visit(map_type.value_type, self)
+            block_count = self._decoder.read_long()
+
+        return read_items
+
+    def primitive(self, primitive: PrimitiveType) -> Union[AvroStructProtocol, Any]:

Review Comment:
   Can we use `singledispatchmethod` here? [Here's](https://github.com/apache/iceberg/blob/master/python/src/iceberg/expressions/literals.py#L164-L208) an example from the literals logic.



##########
python/src/iceberg/avro/decoder.py:
##########
@@ -0,0 +1,197 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+import decimal
+import struct
+
+from iceberg.io.base import InputStream
+
+STRUCT_FLOAT = struct.Struct("<f")  # big-endian float

Review Comment:
   ```suggestion
   STRUCT_FLOAT = struct.Struct("<f")  # little-endian float
   ```



##########
python/src/iceberg/avro/codec.py:
##########
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+"""
+Contains Codecs for Python Avro.
+
+Note that the word "codecs" means "compression/decompression algorithms" in the
+Avro world (https://avro.apache.org/docs/current/spec.html#Object+Container+Files),
+so don't confuse it with the Python's "codecs", which is a package mainly for
+converting character sets (https://docs.python.org/3/library/codecs.html).
+"""
+
+import abc

Review Comment:
   For consistency with other areas, should we just do `from abc import ABC, abstractmethod`?



##########
python/src/iceberg/avro/codec.py:
##########
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+"""
+Contains Codecs for Python Avro.
+
+Note that the word "codecs" means "compression/decompression algorithms" in the
+Avro world (https://avro.apache.org/docs/current/spec.html#Object+Container+Files),
+so don't confuse it with the Python's "codecs", which is a package mainly for
+converting character sets (https://docs.python.org/3/library/codecs.html).
+"""
+
+import abc
+import binascii
+import io
+import struct
+import zlib
+from typing import Dict, Tuple, Type
+
+from iceberg.avro.decoder import BinaryDecoder
+from iceberg.io.memory import MemoryInputStream
+
+STRUCT_CRC32 = struct.Struct(">I")  # big-endian unsigned int
+
+
+def _check_crc32(bytes_: bytes, checksum: bytes) -> None:

Review Comment:
   ```suggestion
   def _check_crc32(bytes_: bytes, checksum: bytes) -> None:
       """Incrementally compute CRC-32 from bytes and compare to a checksum
       
       Args:
         bytes_ (bytes): The bytes to check against `checksum`
         checksum (bytes): Byte representation of a checksum
       
       Raises:
         ValueError: If the computed CRC-32 does not match the checksum
       """
   ```



##########
python/tests/avro/test_reader.py:
##########
@@ -0,0 +1,162 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=W0212
+import pathlib
+
+from iceberg.avro.reader import AvroStructProtocol, DataFileReader
+from tests.io.test_io_base import LocalInputFile
+
+
+def test_read_header():
+    test_file_path = str(pathlib.Path(__file__).parent.resolve()) + "/manifest.avro"

Review Comment:
   What do you think of adding a `tests/avro/data` directory, and then using a `pytest.parametrize` decorator for this test. We could then include a collection of manifest files and test different combinations of `filename`, `expected_header_magic`, `expected_header_meta`, and `expected_header_sync`.
   
   We could do this for `test_reader_data(...)` as well.



##########
python/src/iceberg/avro/decoder.py:
##########
@@ -0,0 +1,197 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+import decimal
+import struct
+
+from iceberg.io.base import InputStream
+
+STRUCT_FLOAT = struct.Struct("<f")  # big-endian float
+STRUCT_DOUBLE = struct.Struct("<d")  # big-endian double

Review Comment:
   ```suggestion
   STRUCT_DOUBLE = struct.Struct("<d")  # little-endian double
   ```



##########
python/tests/utils/test_schema_conversion.py:
##########
@@ -229,3 +234,127 @@ def test_avro_list_required_record():
     iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema)
 
     assert expected_iceberg_schema == iceberg_schema
+
+
+def test_resolve_union():
+    with pytest.raises(TypeError) as exc_info:
+        AvroSchemaConversion()._resolve_union(["null", "string", "long"])
+
+    assert "Non-optional types aren't part of the Iceberg specification" in str(exc_info.value)
+
+
+def test_nested_type():
+    # In the case a primitive field is nested
+    assert AvroSchemaConversion()._convert_schema({"type": {"type": "string"}}) == StringType()
+
+
+def test_map_type():
+    avro_type = {
+        "type": "map",
+        "values": ["long", "null"],
+        "key-id": 101,
+        "value-id": 102,
+    }
+    actual = AvroSchemaConversion()._convert_schema(avro_type)
+    expected = MapType(key_id=101, key_type=StringType(), value_id=102, value_type=LongType(), value_is_optional=True)
+    assert actual == expected
+
+
+def test_fixed_type():
+    avro_type = {"type": "fixed", "size": 22}
+    actual = AvroSchemaConversion()._convert_schema(avro_type)
+    expected = FixedType(22)
+    assert actual == expected
+
+
+def test_unknown_primitive():
+    with pytest.raises(TypeError) as exc_info:
+        avro_type = "UnknownType"
+        AvroSchemaConversion()._convert_schema(avro_type)
+    assert "Unknown type: UnknownType" in str(exc_info.value)
+
+
+def test_unknown_complex_type():
+    with pytest.raises(TypeError) as exc_info:
+        avro_type = {
+            "type": "UnknownType",
+        }
+        AvroSchemaConversion()._convert_schema(avro_type)
+    assert "Unknown type: {'type': 'UnknownType'}" in str(exc_info.value)
+
+
+def test_convert_field_without_field_id():
+    with pytest.raises(ValueError) as exc_info:
+        avro_field = {
+            "name": "contains_null",
+            "type": "boolean",
+        }
+        AvroSchemaConversion()._convert_field(avro_field)
+    assert "Cannot convert field, missing field-id" in str(exc_info.value)
+
+
+def test_convert_record_type_without_record():
+    with pytest.raises(ValueError) as exc_info:
+        avro_field = {"type": "non-record", "name": "avro_schema", "fields": []}
+        AvroSchemaConversion()._convert_record_type(avro_field)
+    assert "Expected record type, got" in str(exc_info.value)
+
+
+def test_avro_list_missing_element_id():
+    avro_type = {
+        "name": "array_with_string",
+        "type": {
+            "type": "array",
+            "items": "string",
+            "default": [],
+            # "element-id": 101,
+        },
+        "field-id": 100,
+    }
+
+    with pytest.raises(ValueError) as exc_info:
+        AvroSchemaConversion()._convert_array_type(avro_type)
+
+    assert "Cannot convert array-type, missing element-id:" in str(exc_info.value)
+
+
+def test_convert_decimal_type():
+    avro_decimal_type = {"type": "bytes", "logicalType": "decimal", "precision": 19, "scale": 25}
+    actual = AvroSchemaConversion()._convert_logical_type(avro_decimal_type)
+    expected = DecimalType(precision=19, scale=25)
+    assert actual == expected
+
+
+def test_convert_date_type():
+    avro_logical_type = {"type": "int", "logicalType": "date"}
+    actual = AvroSchemaConversion()._convert_logical_type(avro_logical_type)
+    assert actual == DateType()
+
+
+def test_unknown_logical_type():
+    avro_logical_type = {"type": "bytes", "logicalType": "date"}
+    with pytest.raises(ValueError) as exc_info:
+        AvroSchemaConversion()._convert_logical_type(avro_logical_type)
+
+    assert "Unknown logical/physical type combination:" in str(exc_info.value)
+
+
+def test_logical_map_with_invalid_fields():

Review Comment:
   I see a pattern here so we may be able to consolidate these into a handful of parametrized tests, making them easily extendable too. How about making a single function per method that's being tested?
   
   - `AvroSchemaConversion()._convert_logical_type`
   - `AvroSchemaConversion()._convert_logical_map_type`
   - `AvroSchemaConversion()._convert_schema`
   - `AvroSchemaConversion()._convert_field`
   - `AvroSchemaConversion()._convert_record_type`
   - `AvroSchemaConversion()._convert_array_type`
   
   As an example, for `AvroSchemaConversion()._convert_logical_type`:
   ```py
   @pytest.mark.parametrize(
       "avro_logical_type,expected",
       [
           ({"type": "int", "logicalType": "date"}, DateType()),
           (
               {"type": "bytes", "logicalType": "decimal", "precision": 19, "scale": 25},
               DecimalType(precision=19, scale=25),
           ),
           ...,
       ],
   )
   def test_schema_conversion_convert_logical_type(avro_logical_type, expected):
       assert AvroSchemaConversion()._convert_logical_type(avro_logical_type) == expected
   ```



##########
python/src/iceberg/avro/reader.py:
##########
@@ -0,0 +1,284 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=W0621
+"""
+Avro reader for reading Avro files
+"""
+from __future__ import annotations
+
+import json
+from dataclasses import dataclass, field
+from io import SEEK_SET
+from typing import (
+    Any,
+    Dict,
+    List,
+    Type,
+    Union,
+)
+
+from iceberg.avro.codec import KNOWN_CODECS, Codec, NullCodec
+from iceberg.avro.decoder import BinaryDecoder
+from iceberg.files import StructProtocol
+from iceberg.io.base import InputFile, InputStream
+from iceberg.schema import Schema, SchemaVisitor, visit
+from iceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
+    DoubleType,
+    FixedType,
+    FloatType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    NestedField,
+    PrimitiveType,
+    StringType,
+    StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+)
+from iceberg.utils.schema_conversion import AvroSchemaConversion
+
+
+@dataclass(frozen=True)
+class AvroStructProtocol(StructProtocol):
+    _data: List[Union[Any, StructProtocol]] = field(default_factory=list)
+
+    def set(self, pos: int, value: Any) -> None:
+        self._data[pos] = value
+
+    def get(self, pos: int) -> Any:
+        return self._data[pos]
+
+
+class _AvroReader(SchemaVisitor[Union[AvroStructProtocol, Any]]):
+    _skip: bool = False
+
+    def __init__(self, decoder: BinaryDecoder):
+        self._decoder = decoder
+
+    def schema(self, schema: Schema, struct_result: Union[AvroStructProtocol, Any]) -> Union[AvroStructProtocol, Any]:
+        return struct_result
+
+    def struct(self, struct: StructType, field_results: List[Union[AvroStructProtocol, Any]]) -> Union[AvroStructProtocol, Any]:
+        return AvroStructProtocol(field_results)
+
+    def before_field(self, field: NestedField) -> None:
+        if field.is_optional:
+            pos = self._decoder.read_long()
+            # We now assume that null is first (which is often the case)
+            if int(pos) == 0:
+                self._skip = True
+
+    def field(self, field: NestedField, field_result: Union[AvroStructProtocol, Any]) -> Union[AvroStructProtocol, Any]:
+        return field_result
+
+    def before_list_element(self, element: NestedField) -> None:
+        self._skip = True
+
+    def list(self, list_type: ListType, element_result: Union[AvroStructProtocol, Any]) -> Union[AvroStructProtocol, Any]:
+        read_items = []
+        block_count = self._decoder.read_long()
+        while block_count != 0:
+            if block_count < 0:
+                block_count = -block_count
+                # We ignore the block size for now
+                _ = self._decoder.read_long()
+            for _ in range(block_count):
+                read_items.append(visit(list_type.element_type, self))
+            block_count = self._decoder.read_long()
+        return read_items
+
+    def before_map_key(self, key: NestedField) -> None:
+        self._skip = True
+
+    def before_map_value(self, value: NestedField) -> None:
+        self._skip = True
+
+    def map(
+        self, map_type: MapType, key_result: Union[AvroStructProtocol, Any], value_result: Union[AvroStructProtocol, Any]
+    ) -> Union[AvroStructProtocol, Any]:
+        read_items = {}
+
+        block_count = self._decoder.read_long()
+        if block_count < 0:
+            block_count = -block_count
+            # We ignore the block size for now
+            _ = self._decoder.read_long()
+
+        # The Iceberg non-string implementation with an array of records:
+        while block_count != 0:
+            for _ in range(block_count):
+                key = visit(map_type.key_type, self)
+                read_items[key] = visit(map_type.value_type, self)
+            block_count = self._decoder.read_long()
+
+        return read_items
+
+    def primitive(self, primitive: PrimitiveType) -> Union[AvroStructProtocol, Any]:
+        if self._skip:
+            self._skip = False
+            return None
+
+        if isinstance(primitive, FixedType):
+            return self._decoder.read(primitive.length)
+        elif isinstance(primitive, DecimalType):
+            return self._decoder.read_decimal_from_bytes(primitive.scale, primitive.precision)
+        elif isinstance(primitive, BooleanType):
+            return self._decoder.read_boolean()
+        elif isinstance(primitive, IntegerType):
+            return self._decoder.read_int()
+        elif isinstance(primitive, LongType):
+            return self._decoder.read_long()
+        elif isinstance(primitive, FloatType):
+            return self._decoder.read_float()
+        elif isinstance(primitive, DoubleType):
+            return self._decoder.read_double()
+        elif isinstance(primitive, DateType):
+            return self._decoder.read_date_from_int()
+        elif isinstance(primitive, TimeType):
+            return self._decoder.read_time_micros_from_long()
+        elif isinstance(primitive, TimestampType):
+            return self._decoder.read_timestamp_micros_from_long()
+        elif isinstance(primitive, TimestamptzType):
+            return self._decoder.read_timestamp_micros_from_long()
+        elif isinstance(primitive, StringType):
+            return self._decoder.read_utf8()
+        elif isinstance(primitive, BinaryType):
+            return self._decoder.read_bytes()
+        else:
+            raise ValueError(f"Unknown type: {primitive}")
+
+
+VERSION = 1
+MAGIC = bytes(b"Obj" + bytearray([VERSION]))
+MAGIC_SIZE = len(MAGIC)
+SYNC_SIZE = 16
+META_SCHEMA = StructType(
+    NestedField(name="magic", field_id=100, field_type=FixedType(length=MAGIC_SIZE), is_optional=False),
+    NestedField(
+        field_id=200,
+        name="meta",
+        field_type=MapType(key_id=201, key_type=StringType(), value_id=202, value_type=BinaryType()),
+        is_optional=False,
+    ),
+    NestedField(field_id=300, name="sync", field_type=FixedType(length=SYNC_SIZE), is_optional=False),
+)

Review Comment:
   Can these constants be moved to the top?



##########
python/src/iceberg/avro/decoder.py:
##########
@@ -0,0 +1,197 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+import decimal
+import struct
+
+from iceberg.io.base import InputStream
+
+STRUCT_FLOAT = struct.Struct("<f")  # big-endian float
+STRUCT_DOUBLE = struct.Struct("<d")  # big-endian double
+STRUCT_SIGNED_SHORT = struct.Struct(">h")  # big-endian signed short
+STRUCT_SIGNED_INT = struct.Struct(">i")  # big-endian signed int
+STRUCT_SIGNED_LONG = struct.Struct(">q")  # big-endian signed long
+
+
+class BinaryDecoder:
+    """Read leaf values."""
+
+    _input_stream: InputStream
+
+    def __init__(self, input_stream: InputStream) -> None:
+        """
+        reader is a Python object on which we can call read, seek, and tell.
+        """
+        self._input_stream = input_stream
+
+    def read(self, n: int) -> bytes:
+        """
+        Read n bytes.
+        """
+        if n < 0:
+            raise ValueError(f"Requested {n} bytes to read, expected positive integer.")
+        read_bytes = self._input_stream.read(n)
+        if len(read_bytes) != n:
+            raise ValueError(f"Read {len(read_bytes)} bytes, expected {n} bytes")
+        return read_bytes
+
+    def read_boolean(self) -> bool:
+        """
+        a boolean is written as a single byte
+        whose value is either 0 (false) or 1 (true).
+        """
+        return ord(self.read(1)) == 1
+
+    def read_int(self) -> int:
+        """
+        int and long values are written using variable-length, zigzag coding.
+        """
+        return self.read_long()
+
+    def read_long(self) -> int:
+        """
+        int and long values are written using variable-length, zigzag coding.

Review Comment:
   super nit: I'd just mention long in this docstring and just mention int in the `read_int` method's docstring. Also it could all be a one-liner with the `"""`'s on the same line.



##########
python/tests/avro/test_reader.py:
##########
@@ -0,0 +1,162 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=W0212
+import pathlib
+
+from iceberg.avro.reader import AvroStructProtocol, DataFileReader

Review Comment:
   Style nit/opinion: I like to fully import the module that's being directly tested in a test file. I feel like it helps easily identify what's being tested with much less cognitive load (just look for `reader.*`. So what do you think of making this `from iceberg.avro import reader` and then using `reader.AvroStructProtocol` and `reader.DataFileReader` down below?



##########
python/tests/avro/test_reader.py:
##########
@@ -0,0 +1,162 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint: disable=W0212
+import pathlib
+
+from iceberg.avro.reader import AvroStructProtocol, DataFileReader
+from tests.io.test_io_base import LocalInputFile
+
+
+def test_read_header():
+    test_file_path = str(pathlib.Path(__file__).parent.resolve()) + "/manifest.avro"
+    test_file = LocalInputFile(test_file_path)
+    with DataFileReader(test_file) as reader:
+        header = reader._read_header()
+
+    assert header.magic == b"Obj\x01"
+    assert header.meta == {
+        "schema": '{"type":"struct","schema-id":0,"fields":[{"id":1,"name":"VendorID","required":false,"type":"int"},{"id":2,"name":"tpep_pickup_datetime","required":false,"type":"string"},{"id":3,"name":"tpep_dropoff_datetime","required":false,"type":"string"},{"id":4,"name":"passenger_count","required":false,"type":"int"},{"id":5,"name":"RatecodeID","required":false,"type":"int"},{"id":6,"name":"store_and_fwd_flag","required":false,"type":"string"},{"id":7,"name":"PULocationID","required":false,"type":"int"},{"id":8,"name":"DOLocationID","required":false,"type":"int"},{"id":9,"name":"payment_type","required":false,"type":"int"},{"id":10,"name":"fare","required":false,"type":"double"},{"id":11,"name":"distance","required":false,"type":"double","doc":"The elapsed trip distance in miles reported by the taximeter."},{"id":12,"name":"fare_per_distance_unit","required":false,"type":"float"},{"id":13,"name":"extra","required":false,"type":"double"},{"id":14,"name":"mta_tax","required":fa
 lse,"type":"double"},{"id":15,"name":"tip_amount","required":false,"type":"double"},{"id":16,"name":"tolls_amount","required":false,"type":"double"},{"id":17,"name":"improvement_surcharge","required":false,"type":"double"},{"id":18,"name":"total_amount","required":false,"type":"double"},{"id":19,"name":"congestion_surcharge","required":false,"type":"double"}]}',
+        "avro.schema": '{"type":"record","name":"manifest_entry","fields":[{"name":"status","type":"int","field-id":0},{"name":"snapshot_id","type":["null","long"],"default":null,"field-id":1},{"name":"data_file","type":{"type":"record","name":"r2","fields":[{"name":"file_path","type":"string","doc":"Location URI with FS scheme","field-id":100},{"name":"file_format","type":"string","doc":"File format name: avro, orc, or parquet","field-id":101},{"name":"partition","type":{"type":"record","name":"r102","fields":[{"name":"VendorID","type":["null","int"],"default":null,"field-id":1000}]},"field-id":102},{"name":"record_count","type":"long","doc":"Number of records in the file","field-id":103},{"name":"file_size_in_bytes","type":"long","doc":"Total file size in bytes","field-id":104},{"name":"block_size_in_bytes","type":"long","field-id":105},{"name":"column_sizes","type":["null",{"type":"array","items":{"type":"record","name":"k117_v118","fields":[{"name":"key","type":"int","field-id":
 117},{"name":"value","type":"long","field-id":118}]},"logicalType":"map"}],"doc":"Map of column id to total size on disk","default":null,"field-id":108},{"name":"value_counts","type":["null",{"type":"array","items":{"type":"record","name":"k119_v120","fields":[{"name":"key","type":"int","field-id":119},{"name":"value","type":"long","field-id":120}]},"logicalType":"map"}],"doc":"Map of column id to total count, including null and NaN","default":null,"field-id":109},{"name":"null_value_counts","type":["null",{"type":"array","items":{"type":"record","name":"k121_v122","fields":[{"name":"key","type":"int","field-id":121},{"name":"value","type":"long","field-id":122}]},"logicalType":"map"}],"doc":"Map of column id to null value count","default":null,"field-id":110},{"name":"nan_value_counts","type":["null",{"type":"array","items":{"type":"record","name":"k138_v139","fields":[{"name":"key","type":"int","field-id":138},{"name":"value","type":"long","field-id":139}]},"logicalType":"map"}],"
 doc":"Map of column id to number of NaN values in the column","default":null,"field-id":137},{"name":"lower_bounds","type":["null",{"type":"array","items":{"type":"record","name":"k126_v127","fields":[{"name":"key","type":"int","field-id":126},{"name":"value","type":"bytes","field-id":127}]},"logicalType":"map"}],"doc":"Map of column id to lower bound","default":null,"field-id":125},{"name":"upper_bounds","type":["null",{"type":"array","items":{"type":"record","name":"k129_v130","fields":[{"name":"key","type":"int","field-id":129},{"name":"value","type":"bytes","field-id":130}]},"logicalType":"map"}],"doc":"Map of column id to upper bound","default":null,"field-id":128},{"name":"key_metadata","type":["null","bytes"],"doc":"Encryption key metadata blob","default":null,"field-id":131},{"name":"split_offsets","type":["null",{"type":"array","items":"long","element-id":133}],"doc":"Splittable offsets","default":null,"field-id":132},{"name":"sort_order_id","type":["null","int"],"doc":"Sor
 t order ID","default":null,"field-id":140}]},"field-id":2}]}',
+        "avro.codec": "deflate",
+        "format-version": "1",
+        "partition-spec-id": "0",
+        "iceberg.schema": '{"type":"struct","schema-id":0,"fields":[{"id":0,"name":"status","required":true,"type":"int"},{"id":1,"name":"snapshot_id","required":false,"type":"long"},{"id":2,"name":"data_file","required":true,"type":{"type":"struct","fields":[{"id":100,"name":"file_path","required":true,"type":"string","doc":"Location URI with FS scheme"},{"id":101,"name":"file_format","required":true,"type":"string","doc":"File format name: avro, orc, or parquet"},{"id":102,"name":"partition","required":true,"type":{"type":"struct","fields":[{"id":1000,"name":"VendorID","required":false,"type":"int"}]}},{"id":103,"name":"record_count","required":true,"type":"long","doc":"Number of records in the file"},{"id":104,"name":"file_size_in_bytes","required":true,"type":"long","doc":"Total file size in bytes"},{"id":105,"name":"block_size_in_bytes","required":true,"type":"long"},{"id":108,"name":"column_sizes","required":false,"type":{"type":"map","key-id":117,"key":"int","value-id":118,"v
 alue":"long","value-required":true},"doc":"Map of column id to total size on disk"},{"id":109,"name":"value_counts","required":false,"type":{"type":"map","key-id":119,"key":"int","value-id":120,"value":"long","value-required":true},"doc":"Map of column id to total count, including null and NaN"},{"id":110,"name":"null_value_counts","required":false,"type":{"type":"map","key-id":121,"key":"int","value-id":122,"value":"long","value-required":true},"doc":"Map of column id to null value count"},{"id":137,"name":"nan_value_counts","required":false,"type":{"type":"map","key-id":138,"key":"int","value-id":139,"value":"long","value-required":true},"doc":"Map of column id to number of NaN values in the column"},{"id":125,"name":"lower_bounds","required":false,"type":{"type":"map","key-id":126,"key":"int","value-id":127,"value":"binary","value-required":true},"doc":"Map of column id to lower bound"},{"id":128,"name":"upper_bounds","required":false,"type":{"type":"map","key-id":129,"key":"int"
 ,"value-id":130,"value":"binary","value-required":true},"doc":"Map of column id to upper bound"},{"id":131,"name":"key_metadata","required":false,"type":"binary","doc":"Encryption key metadata blob"},{"id":132,"name":"split_offsets","required":false,"type":{"type":"list","element-id":133,"element":"long","element-required":true},"doc":"Splittable offsets"},{"id":140,"name":"sort_order_id","required":false,"type":"int","doc":"Sort order ID"}]}}]}',
+        "partition-spec": '[{"name":"VendorID","transform":"identity","source-id":1,"field-id":1000}]',
+    }
+    assert header.sync == b"B\x9e\xb0\xc3\x04,vG\xd1O\x8e\xf5\x9c\xdd\x17\x8d"

Review Comment:
   Should we also test `header.get_schema()`?
   
   ```suggestion
       assert header.sync == b"B\x9e\xb0\xc3\x04,vG\xd1O\x8e\xf5\x9c\xdd\x17\x8d"
       assert header.get_schema() == Schema(
           NestedField(field_id=0, name="status", field_type=IntegerType(), is_optional=False),
           NestedField(
               field_id=1, name="snapshot_id", field_type=LongType(), is_optional=True
           ),
           NestedField(
               field_id=2,
               name="data_file",
               field_type=StructType(
                   fields=(
                       NestedField(
                           field_id=100,
                           name="file_path",
                           field_type=StringType(),
                           is_optional=False,
                       ),
                       NestedField(
                           field_id=101,
                           name="file_format",
                           field_type=StringType(),
                           is_optional=False,
                       ),
                       NestedField(
                           field_id=102,
                           name="partition",
                           field_type=StructType(
                               fields=(
                                   NestedField(
                                       field_id=1000,
                                       name="VendorID",
                                       field_type=IntegerType(),
                                       is_optional=True,
                                   ),
                               )
                           ),
                           is_optional=False,
                       ),
                       NestedField(
                           field_id=103,
                           name="record_count",
                           field_type=LongType(),
                           is_optional=False,
                       ),
                       NestedField(
                           field_id=104,
                           name="file_size_in_bytes",
                           field_type=LongType(),
                           is_optional=False,
                       ),
                       NestedField(
                           field_id=105,
                           name="block_size_in_bytes",
                           field_type=LongType(),
                           is_optional=False,
                       ),
                       NestedField(
                           field_id=108,
                           name="column_sizes",
                           field_type=MapType(
                               key_id=117,
                               key_type=IntegerType(),
                               value_id=118,
                               value_type=LongType(),
                               value_is_optional=False,
                           ),
                           is_optional=True,
                       ),
                       NestedField(
                           field_id=109,
                           name="value_counts",
                           field_type=MapType(
                               key_id=119,
                               key_type=IntegerType(),
                               value_id=120,
                               value_type=LongType(),
                               value_is_optional=False,
                           ),
                           is_optional=True,
                       ),
                       NestedField(
                           field_id=110,
                           name="null_value_counts",
                           field_type=MapType(
                               key_id=121,
                               key_type=IntegerType(),
                               value_id=122,
                               value_type=LongType(),
                               value_is_optional=False,
                           ),
                           is_optional=True,
                       ),
                       NestedField(
                           field_id=137,
                           name="nan_value_counts",
                           field_type=MapType(
                               key_id=138,
                               key_type=IntegerType(),
                               value_id=139,
                               value_type=LongType(),
                               value_is_optional=False,
                           ),
                           is_optional=True,
                       ),
                       NestedField(
                           field_id=125,
                           name="lower_bounds",
                           field_type=MapType(
                               key_id=126,
                               key_type=IntegerType(),
                               value_id=127,
                               value_type=BinaryType(),
                               value_is_optional=False,
                           ),
                           is_optional=True,
                       ),
                       NestedField(
                           field_id=128,
                           name="upper_bounds",
                           field_type=MapType(
                               key_id=129,
                               key_type=IntegerType(),
                               value_id=130,
                               value_type=BinaryType(),
                               value_is_optional=False,
                           ),
                           is_optional=True,
                       ),
                       NestedField(
                           field_id=131,
                           name="key_metadata",
                           field_type=BinaryType(),
                           is_optional=True,
                       ),
                       NestedField(
                           field_id=132,
                           name="split_offsets",
                           field_type=ListType(
                               element_id=133,
                               element_type=LongType(),
                               element_is_optional=False,
                           ),
                           is_optional=True,
                       ),
                       NestedField(
                           field_id=140,
                           name="sort_order_id",
                           field_type=IntegerType(),
                           is_optional=True,
                       ),
                   )
               ),
               is_optional=False,
           ),
           schema_id=1,
           identifier_field_ids=[],
       )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org