You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/13 17:47:42 UTC

[GitHub] [iceberg] rdblue commented on a diff in pull request #4742: Python: Convert Avro to Iceberg schema

rdblue commented on code in PR #4742:
URL: https://github.com/apache/iceberg/pull/4742#discussion_r872638896


##########
python/src/iceberg/utils/schema_conversion.py:
##########
@@ -0,0 +1,274 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Util class for converting between Avro and Iceberg schemas
+
+"""
+import logging
+from typing import Any, Dict, List, Tuple, Union
+
+from iceberg.schema import Schema
+from iceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DoubleType,
+    FloatType,
+    IntegerType,
+    LongType,
+    MapType,
+    NestedField,
+    PrimitiveType,
+    StringType,
+    StructType,
+    TimestampType,
+    TimeType,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class AvroSchemaConversion:
+    PRIMITIVE_FIELD_TYPE_MAP: Dict[str, PrimitiveType] = {
+        "boolean": BooleanType(),
+        "bytes": BinaryType(),
+        "date": DateType(),
+        "double": DoubleType(),
+        "float": FloatType(),
+        "int": IntegerType(),
+        "long": LongType(),
+        "string": StringType(),
+        "time-millis": TimeType(),
+        "timestamp-millis": TimestampType(),
+    }
+
+    def avro_to_iceberg(self, avro_schema: Dict[str, Any]) -> Schema:
+        """Converts an Apache Avro into an Apache Iceberg schema equivalent
+
+        This expect to have field id's to be encoded in the Avro schema::
+
+            {
+                "type": "record",
+                "name": "manifest_file",
+                "fields": [
+                    {"name": "manifest_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 500},
+                    {"name": "manifest_length", "type": "long", "doc": "Total file size in bytes", "field-id": 501}
+                ]
+            }
+
+        Example:
+            This converts an Avro schema into a Iceberg schema:
+
+            >>> avro_schema = AvroSchemaConversion().avro_to_iceberg({
+            ...     "type": "record",
+            ...     "name": "manifest_file",
+            ...     "fields": [
+            ...         {"name": "manifest_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 500},
+            ...         {"name": "manifest_length", "type": "long", "doc": "Total file size in bytes", "field-id": 501}
+            ...     ]
+            ... })
+            >>> iceberg_schema = Schema(
+            ...     NestedField(
+            ...         field_id=500, name="manifest_path", field_type=StringType(), is_optional=False, doc="Location URI with FS scheme"
+            ...     ),
+            ...     NestedField(
+            ...         field_id=501, name="manifest_length", field_type=LongType(), is_optional=False, doc="Total file size in bytes"
+            ...     ),
+            ...     schema_id=1
+            ... )
+            >>> avro_schema == iceberg_schema
+            True
+
+        Args:
+            avro_schema (Dict[str, Any]): The JSON decoded Avro schema
+
+        Returns:
+            Equivalent Iceberg schema
+
+        Todo:
+            * Implement full support for unions
+            * Implement logical types
+        """
+        fields = self._parse_record(avro_schema)
+        return Schema(*fields.fields, schema_id=1)  # type: ignore
+
+    def _parse_record(self, avro_field: Dict[str, Any]) -> StructType:
+        return StructType(*[self._parse_field(field) for field in avro_field["fields"]])
+
+    def _resolve_union(self, type_union: Union[Dict, List, str]) -> Tuple[Union[str, dict], bool]:
+        """
+        Converts Unions into their type and resolves if the field is optional
+
+        Examples:
+            >>> AvroSchemaConversion()._resolve_union('str')
+            ('str', False)
+            >>> AvroSchemaConversion()._resolve_union(['null', 'str'])
+            ('str', True)
+            >>> AvroSchemaConversion()._resolve_union([{'type': 'str'}])
+            ({'type': 'str'}, False)
+            >>> AvroSchemaConversion()._resolve_union(['null', {'type': 'str'}])
+            ({'type': 'str'}, True)
+
+        Args:
+            type_union: The field, can be a string 'str', list ['null', 'str'], or dict {"type": 'str'}
+
+        Returns:
+            A tuple containing the type and nullability
+
+        """
+        avro_types: Union[Dict, List]
+        if isinstance(type_union, str):
+            avro_types = [type_union]
+        else:
+            avro_types = type_union
+
+        is_optional = "null" in avro_types
+
+        # Filter the null value, so we know the actual type
+        avro_types = list(filter(lambda t: t != "null", avro_types))
+
+        if len(avro_types) != 1:
+            raise ValueError("Support for unions is yet to be implemented")

Review Comment:
   This is outside the Iceberg spec (although #4242 is proposing a way to handle it in Java).
   
   I think this can just state that the type is invalid because it is a non-option union.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org