You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "Fokko (via GitHub)" <gi...@apache.org> on 2023/06/27 12:46:33 UTC

[GitHub] [iceberg] Fokko commented on a diff in pull request #7873: Python: Avro write

Fokko commented on code in PR #7873:
URL: https://github.com/apache/iceberg/pull/7873#discussion_r1243251097


##########
python/pyiceberg/avro/encoder.py:
##########
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import decimal
+import struct
+from datetime import date, datetime, time
+
+from pyiceberg.avro import STRUCT_DOUBLE, STRUCT_FLOAT
+from pyiceberg.io import OutputStream
+from pyiceberg.utils.datetime import date_to_days, datetime_to_micros
+
+
+def _time_object_to_micros(t: time) -> int:

Review Comment:
   Let's add this one to `utils/datetime.py`



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -249,7 +347,7 @@ def visit_time(self, time_type: TimeType, partner: Optional[IcebergType]) -> Rea
     def visit_timestamp(self, timestamp_type: TimestampType, partner: Optional[IcebergType]) -> Reader:
         return TimestampReader()
 
-    def visit_timestampz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Reader:
+    def visit_timestamptz(self, timestamptz_type: TimestamptzType, partner: Optional[IcebergType]) -> Reader:

Review Comment:
   Great catch



##########
python/tests/avro/test_file.py:
##########
@@ -58,3 +75,115 @@ def test_missing_schema() -> None:
         header.get_schema()
 
     assert "No schema found in Avro file headers" in str(exc_info.value)
+
+
+# helper function to serialize our objects to dicts to enable
+# direct comparison with the dicts returned by fastavro
+def todict(obj: Any) -> Any:
+    if isinstance(obj, dict):
+        data = []
+        for k, v in obj.items():
+            data.append({"key": k, "value": v})
+        return data
+    elif isinstance(obj, Enum):
+        return obj.value
+    elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not isinstance(obj, bytes):
+        return [todict(v) for v in obj]
+    elif hasattr(obj, "__dict__"):
+        return {key: todict(value) for key, value in obj.__dict__.items() if not callable(value) and not key.startswith("_")}
+    else:
+        return obj
+
+
+def test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None:

Review Comment:
   Love it



##########
python/tests/avro/test_file.py:
##########
@@ -58,3 +75,115 @@ def test_missing_schema() -> None:
         header.get_schema()
 
     assert "No schema found in Avro file headers" in str(exc_info.value)
+
+
+# helper function to serialize our objects to dicts to enable
+# direct comparison with the dicts returned by fastavro
+def todict(obj: Any) -> Any:
+    if isinstance(obj, dict):
+        data = []
+        for k, v in obj.items():
+            data.append({"key": k, "value": v})
+        return data
+    elif isinstance(obj, Enum):
+        return obj.value
+    elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not isinstance(obj, bytes):
+        return [todict(v) for v in obj]
+    elif hasattr(obj, "__dict__"):
+        return {key: todict(value) for key, value in obj.__dict__.items() if not callable(value) and not key.startswith("_")}
+    else:
+        return obj
+
+
+def test_write_manifest_entry_with_iceberg_read_with_fastavro() -> None:
+    data_file = DataFile(
+        content=DataFileContent.DATA,
+        file_path="s3://some-path/some-file.parquet",
+        file_format=FileFormat.PARQUET,
+        partition=Record(),
+        record_count=131327,
+        file_size_in_bytes=220669226,
+        column_sizes={1: 220661854},
+        value_counts={1: 131327},
+        null_value_counts={1: 0},
+        nan_value_counts={},
+        lower_bounds={1: b"aaaaaaaaaaaaaaaa"},
+        upper_bounds={1: b"zzzzzzzzzzzzzzzz"},
+        key_metadata=b"\xde\xad\xbe\xef",
+        split_offsets=[4, 133697593],
+        equality_ids=[],
+        sort_order_id=4,
+        spec_id=3,
+    )
+    entry = ManifestEntry(
+        status=ManifestEntryStatus.ADDED,
+        snapshot_id=8638475580105682862,
+        data_sequence_number=0,
+        file_sequence_number=0,
+        data_file=data_file,
+    )
+
+    with TemporaryDirectory() as tmpdir:
+        tmp_avro_file = tmpdir + "/manifest_entry.avro"
+
+        with avro.AvroOutputFile[ManifestEntry](
+            PyArrowFileIO().new_output(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, "manifest_entry"
+        ) as out:
+            out.write_block([entry])
+
+        schema = AvroSchemaConversion().iceberg_to_avro(MANIFEST_ENTRY_SCHEMA, schema_name="manifest_entry")
+
+        with open(tmp_avro_file, "rb") as fo:
+            r = reader(fo=fo, reader_schema=schema)

Review Comment:
   Do you need the schema here? The schema is in the file header.



##########
python/tests/avro/test_encoder.py:
##########
@@ -0,0 +1,230 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import datetime
+import io
+import struct
+from decimal import Decimal
+
+from pyiceberg.avro.encoder import BinaryEncoder
+
+
+def zigzag_encode(datum: int) -> bytes:
+    result = []
+    datum = (datum << 1) ^ (datum >> 63)
+    while (datum & ~0x7F) != 0:
+        result.append(struct.pack("B", (datum & 0x7F) | 0x80))
+        datum >>= 7
+    result.append(struct.pack("B", datum))
+    return b"".join(result)
+
+
+def test_write() -> None:
+    output = io.BytesIO()
+    encoder = BinaryEncoder(output)
+
+    _input = b"\x12\x34\x56"
+
+    encoder.write(_input)
+
+    assert output.getbuffer() == _input
+
+
+def test_write_boolean() -> None:
+    output = io.BytesIO()
+    encoder = BinaryEncoder(output)
+
+    encoder.write_boolean(True)
+    encoder.write_boolean(False)
+
+    assert output.getbuffer() == struct.pack("??", True, False)
+
+
+def test_write_int() -> None:
+    output = io.BytesIO()
+    encoder = BinaryEncoder(output)
+
+    _1byte_input = 2
+    _2byte_input = 7466
+    _3byte_input = 523490
+    _4byte_input = 86561570
+    _5byte_input = 2510416930
+    _6byte_input = 734929016866
+    _7byte_input = 135081528772642
+    _8byte_input = 35124861473277986
+
+    encoder.write_int(_1byte_input)
+    encoder.write_int(_2byte_input)
+    encoder.write_int(_3byte_input)
+    encoder.write_int(_4byte_input)
+    encoder.write_int(_5byte_input)
+    encoder.write_int(_6byte_input)
+    encoder.write_int(_7byte_input)
+    encoder.write_int(_8byte_input)
+
+    buffer = output.getbuffer()
+
+    assert buffer[0:1] == zigzag_encode(_1byte_input)

Review Comment:
   Instead of this `zigzag_encode`. I think it makes more sense to assert against the actual bytes. The `zigzag_encode` is very similar to the actual encoding code, therefore we don't know if it is correct.



##########
python/pyiceberg/avro/resolver.py:
##########
@@ -97,6 +119,82 @@ def construct_reader(
     return resolve(file_schema, file_schema, read_types)
 
 
+def construct_writer(file_schema: Union[Schema, IcebergType]) -> Writer:
+    """Constructs a writer from a file schema.
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file.
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object type.
+    """
+    return visit(file_schema, ConstructWriter())
+
+
+class ConstructWriter(SchemaVisitorPerPrimitiveType[Writer]):
+    """Constructs a writer tree from an Iceberg schema."""
+
+    def schema(self, schema: Schema, struct_result: Writer) -> Writer:
+        return struct_result
+
+    def struct(self, struct: StructType, field_results: List[Writer]) -> Writer:
+        return StructWriter(tuple(field_results))
+
+    def field(self, field: NestedField, field_result: Writer) -> Writer:
+        if field.required:

Review Comment:
   Just a suggestion, I like to inline this kind of statement:
   ```suggestion
           return field_result if field.required else OptionWriter(field_result)
   ```
   Feel free to ignore this



##########
python/pyiceberg/avro/writer.py:
##########
@@ -0,0 +1,202 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Classes for building the Reader tree.
+
+Constructing a reader tree from the schema makes it easy
+to decouple the reader implementation from the schema.
+
+The reader tree can be changed in such a way that the
+read schema is different, while respecting the read schema.
+"""
+from __future__ import annotations
+
+from abc import abstractmethod
+from dataclasses import dataclass
+from dataclasses import field as dataclassfield
+from datetime import datetime, time
+from typing import (
+    Any,
+    Dict,
+    List,
+    Tuple,
+)
+from uuid import UUID
+
+from pyiceberg.avro.encoder import BinaryEncoder
+from pyiceberg.types import StructType
+from pyiceberg.utils.singleton import Singleton
+
+
+class Writer(Singleton):
+    @abstractmethod
+    def write(self, encoder: BinaryEncoder, val: Any) -> Any:
+        ...
+
+    def __repr__(self) -> str:
+        return f"{self.__class__.__name__}()"
+
+
+class NoneWriter(Writer):
+    def write(self, _: BinaryEncoder, __: Any) -> None:
+        return None

Review Comment:
   ```suggestion
           pass
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org