You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/10/21 20:06:30 UTC

[iceberg] branch master updated: Python: Visitor to convert Iceberg to PyArrow schema (#5949)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 677d914013 Python: Visitor to convert Iceberg to PyArrow schema (#5949)
677d914013 is described below

commit 677d91401309a4fdb345a1065cc0ee31b7df7d61
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Fri Oct 21 22:06:25 2022 +0200

    Python: Visitor to convert Iceberg to PyArrow schema (#5949)
    
    This is required for manually specifying datasets:
    https://arrow.apache.org/docs/python/dataset.html#manual-specification-of-the-dataset
    
    From PyArrow:
    
    The dataset() function allows easy creation of a Dataset viewing a directory,
    crawling all subdirectories for files and partitioning information.
    However sometimes discovery is not required and the dataset’s files and
    partitions are already known (for example, when this information is stored in metadata).
    In this case it is possible to create a Dataset explicitly without any automatic discovery or inference.
---
 python/pyiceberg/io/pyarrow.py  | 134 +++++++++++++++++++++++++++++++++++-
 python/tests/io/test_pyarrow.py | 149 +++++++++++++++++++++++++++++++++++++++-
 2 files changed, 280 insertions(+), 3 deletions(-)

diff --git a/python/pyiceberg/io/pyarrow.py b/python/pyiceberg/io/pyarrow.py
index dd82bbfb77..248f3603a3 100644
--- a/python/pyiceberg/io/pyarrow.py
+++ b/python/pyiceberg/io/pyarrow.py
@@ -23,10 +23,16 @@ with the pyarrow library.
 """
 
 import os
-from functools import lru_cache
-from typing import Callable, Tuple, Union
+from functools import lru_cache, singledispatch
+from typing import (
+    Callable,
+    List,
+    Tuple,
+    Union,
+)
 from urllib.parse import urlparse
 
+import pyarrow as pa
 from pyarrow.fs import (
     FileInfo,
     FileSystem,
@@ -41,7 +47,28 @@ from pyiceberg.io import (
     OutputFile,
     OutputStream,
 )
+from pyiceberg.schema import Schema, SchemaVisitor, visit
 from pyiceberg.typedef import EMPTY_DICT, Properties
+from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
+    DoubleType,
+    FixedType,
+    FloatType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    NestedField,
+    PrimitiveType,
+    StringType,
+    StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+)
 
 
 class PyArrowFile(InputFile, OutputFile):
@@ -239,3 +266,106 @@ class PyArrowFileIO(FileIO):
             elif e.errno == 13 or "AWS Error [code 15]" in str(e):
                 raise PermissionError(f"Cannot delete file, access denied: {location}") from e
             raise  # pragma: no cover - If some other kind of OSError, raise the raw error
+
+
+def schema_to_pyarrow(schema: Schema) -> pa.schema:
+    return visit(schema, _ConvertToArrowSchema())
+
+
+class _ConvertToArrowSchema(SchemaVisitor[pa.DataType]):
+    def schema(self, _: Schema, struct_result: pa.StructType) -> pa.schema:
+        return pa.schema(list(struct_result))
+
+    def struct(self, _: StructType, field_results: List[pa.DataType]) -> pa.DataType:
+        return pa.struct(field_results)
+
+    def field(self, field: NestedField, field_result: pa.DataType) -> pa.Field:
+        return pa.field(
+            name=field.name,
+            type=field_result,
+            nullable=not field.required,
+            metadata={"doc": field.doc, "id": str(field.field_id)} if field.doc else {},
+        )
+
+    def list(self, _: ListType, element_result: pa.DataType) -> pa.DataType:
+        return pa.list_(value_type=element_result)
+
+    def map(self, _: MapType, key_result: pa.DataType, value_result: pa.DataType) -> pa.DataType:
+        return pa.map_(key_type=key_result, item_type=value_result)
+
+    def primitive(self, primitive: PrimitiveType) -> pa.DataType:
+        return _iceberg_to_pyarrow_type(primitive)
+
+
+@singledispatch
+def _iceberg_to_pyarrow_type(primitive: PrimitiveType) -> pa.DataType:
+    raise ValueError(f"Unknown type: {primitive}")
+
+
+@_iceberg_to_pyarrow_type.register
+def _(primitive: FixedType) -> pa.DataType:
+    return pa.binary(primitive.length)
+
+
+@_iceberg_to_pyarrow_type.register
+def _(primitive: DecimalType) -> pa.DataType:
+    return pa.decimal128(primitive.precision, primitive.scale)
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: BooleanType) -> pa.DataType:
+    return pa.bool_()
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: IntegerType) -> pa.DataType:
+    return pa.int32()
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: LongType) -> pa.DataType:
+    return pa.int64()
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: FloatType) -> pa.DataType:
+    # 32-bit IEEE 754 floating point
+    return pa.float32()
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: DoubleType) -> pa.DataType:
+    # 64-bit IEEE 754 floating point
+    return pa.float64()
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: DateType) -> pa.DataType:
+    # Date encoded as an int
+    return pa.date32()
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: TimeType) -> pa.DataType:
+    return pa.time64("us")
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: TimestampType) -> pa.DataType:
+    return pa.timestamp(unit="ms")
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: TimestamptzType) -> pa.DataType:
+    return pa.timestamp(unit="ms", tz="+00:00")
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: StringType) -> pa.DataType:
+    return pa.string()
+
+
+@_iceberg_to_pyarrow_type.register
+def _(_: BinaryType) -> pa.DataType:
+    # Variable length by default
+    return pa.binary()
diff --git a/python/tests/io/test_pyarrow.py b/python/tests/io/test_pyarrow.py
index 1b0b07f671..c199c711d1 100644
--- a/python/tests/io/test_pyarrow.py
+++ b/python/tests/io/test_pyarrow.py
@@ -20,11 +20,35 @@ import os
 import tempfile
 from unittest.mock import MagicMock, patch
 
+import pyarrow as pa
 import pytest
 from pyarrow.fs import FileType
 
 from pyiceberg.io import InputStream, OutputStream
-from pyiceberg.io.pyarrow import PyArrowFile, PyArrowFileIO
+from pyiceberg.io.pyarrow import (
+    PyArrowFile,
+    PyArrowFileIO,
+    _ConvertToArrowSchema,
+    schema_to_pyarrow,
+)
+from pyiceberg.schema import Schema, visit
+from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
+    DoubleType,
+    FixedType,
+    FloatType,
+    IntegerType,
+    ListType,
+    LongType,
+    MapType,
+    StringType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+)
 
 
 def test_pyarrow_input_file():
@@ -264,3 +288,126 @@ def test_deleting_s3_file_not_found():
             PyArrowFileIO().delete("s3://foo/bar.txt")
 
         assert "Cannot delete file, does not exist:" in str(exc_info.value)
+
+
+def test_schema_to_pyarrow_schema(table_schema_nested: Schema):
+    actual = schema_to_pyarrow(table_schema_nested)
+    expected = """foo: string
+bar: int32 not null
+baz: bool
+qux: list<item: string> not null
+  child 0, item: string
+quux: map<string, map<string, int32>> not null
+  child 0, entries: struct<key: string not null, value: map<string, int32>> not null
+      child 0, key: string not null
+      child 1, value: map<string, int32>
+          child 0, entries: struct<key: string not null, value: int32> not null
+              child 0, key: string not null
+              child 1, value: int32
+location: list<item: struct<latitude: float, longitude: float>> not null
+  child 0, item: struct<latitude: float, longitude: float>
+      child 0, latitude: float
+      child 1, longitude: float
+person: struct<name: string, age: int32 not null>
+  child 0, name: string
+  child 1, age: int32 not null"""
+    assert repr(actual) == expected
+
+
+def test_fixed_type_to_pyarrow():
+    length = 22
+    iceberg_type = FixedType(length)
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.binary(length)
+
+
+def test_decimal_type_to_pyarrow():
+    precision = 25
+    scale = 19
+    iceberg_type = DecimalType(precision, scale)
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.decimal128(precision, scale)
+
+
+def test_boolean_type_to_pyarrow():
+    iceberg_type = BooleanType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.bool_()
+
+
+def test_integer_type_to_pyarrow():
+    iceberg_type = IntegerType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.int32()
+
+
+def test_long_type_to_pyarrow():
+    iceberg_type = LongType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.int64()
+
+
+def test_float_type_to_pyarrow():
+    iceberg_type = FloatType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.float32()
+
+
+def test_double_type_to_pyarrow():
+    iceberg_type = DoubleType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.float64()
+
+
+def test_date_type_to_pyarrow():
+    iceberg_type = DateType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.date32()
+
+
+def test_time_type_to_pyarrow():
+    iceberg_type = TimeType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.time64("us")
+
+
+def test_timestamp_type_to_pyarrow():
+    iceberg_type = TimestampType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="ms")
+
+
+def test_timestamptz_type_to_pyarrow():
+    iceberg_type = TimestamptzType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.timestamp(unit="ms", tz="+00:00")
+
+
+def test_string_type_to_pyarrow():
+    iceberg_type = StringType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.string()
+
+
+def test_binary_type_to_pyarrow():
+    iceberg_type = BinaryType()
+    assert visit(iceberg_type, _ConvertToArrowSchema()) == pa.binary()
+
+
+def test_struct_type_to_pyarrow(table_schema_simple: Schema):
+    expected = pa.struct(
+        [
+            pa.field("foo", pa.string(), nullable=True, metadata={"id": "1"}),
+            pa.field("bar", pa.int32(), nullable=False, metadata={"id": "2"}),
+            pa.field("baz", pa.bool_(), nullable=True, metadata={"id": "3"}),
+        ]
+    )
+    assert visit(table_schema_simple.as_struct(), _ConvertToArrowSchema()) == expected
+
+
+def test_map_type_to_pyarrow():
+    iceberg_map = MapType(
+        key_id=1,
+        key_type=IntegerType(),
+        value_id=2,
+        value_type=StringType(),
+        value_required=True,
+    )
+    assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.map_(pa.int32(), pa.string())
+
+
+def test_list_type_to_pyarrow():
+    iceberg_map = ListType(
+        element_id=1,
+        element_type=IntegerType(),
+        element_required=True,
+    )
+    assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.list_(pa.int32())