You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2023/06/02 22:54:35 UTC
[iceberg] branch master updated: Python: Add support for initial default (#7699)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5f39174fc4 Python: Add support for initial default (#7699)
5f39174fc4 is described below
commit 5f39174fc4ea7404561e8ec44b7097c227d3dfc7
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Sat Jun 3 00:54:28 2023 +0200
Python: Add support for initial default (#7699)
---
python/pyiceberg/avro/file.py | 6 +-
python/pyiceberg/avro/reader.py | 13 +++
python/pyiceberg/avro/resolver.py | 59 ++++++++++--
python/pyiceberg/manifest.py | 10 +-
python/pyiceberg/types.py | 3 +
python/tests/avro/test_resolver.py | 21 +++++
python/tests/conftest.py | 133 +++++++++++++++++++++++++--
python/tests/utils/test_manifest.py | 56 +++++++++--
python/tests/utils/test_schema_conversion.py | 4 +-
9 files changed, 274 insertions(+), 31 deletions(-)
diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py
index 56163d7bc7..3968cdc42e 100644
--- a/python/pyiceberg/avro/file.py
+++ b/python/pyiceberg/avro/file.py
@@ -22,6 +22,7 @@ from __future__ import annotations
import json
from dataclasses import dataclass
+from enum import Enum
from types import TracebackType
from typing import (
Callable,
@@ -121,6 +122,7 @@ class AvroFile(Generic[D]):
input_file: InputFile
read_schema: Optional[Schema]
read_types: Dict[int, Callable[..., StructProtocol]]
+ read_enums: Dict[int, Callable[..., Enum]]
input_stream: InputStream
header: AvroFileHeader
schema: Schema
@@ -134,10 +136,12 @@ class AvroFile(Generic[D]):
input_file: InputFile,
read_schema: Optional[Schema] = None,
read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
+ read_enums: Dict[int, Callable[..., Enum]] = EMPTY_DICT,
) -> None:
self.input_file = input_file
self.read_schema = read_schema
self.read_types = read_types
+ self.read_enums = read_enums
def __enter__(self) -> AvroFile[D]:
"""
@@ -154,7 +158,7 @@ class AvroFile(Generic[D]):
if not self.read_schema:
self.read_schema = self.schema
- self.reader = resolve(self.schema, self.read_schema, self.read_types)
+ self.reader = resolve(self.schema, self.read_schema, self.read_types, self.read_enums)
return self
diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py
index 9156a08116..2a94d0c069 100644
--- a/python/pyiceberg/avro/reader.py
+++ b/python/pyiceberg/avro/reader.py
@@ -103,6 +103,19 @@ class NoneReader(Reader):
return None
+class DefaultReader(Reader):
+ default_value: Any
+
+ def __init__(self, default_value: Any) -> None:
+ self.default_value = default_value
+
+ def read(self, _: BinaryDecoder) -> Any:
+ return self.default_value
+
+ def skip(self, decoder: BinaryDecoder) -> None:
+ pass
+
+
class BooleanReader(Reader):
def read(self, decoder: BinaryDecoder) -> bool:
return decoder.read_boolean()
diff --git a/python/pyiceberg/avro/resolver.py b/python/pyiceberg/avro/resolver.py
index bacd942b33..e72430c644 100644
--- a/python/pyiceberg/avro/resolver.py
+++ b/python/pyiceberg/avro/resolver.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=arguments-renamed,unused-argument
+from enum import Enum
from typing import (
Callable,
Dict,
@@ -24,11 +25,13 @@ from typing import (
Union,
)
+from pyiceberg.avro.decoder import BinaryDecoder
from pyiceberg.avro.reader import (
BinaryReader,
BooleanReader,
DateReader,
DecimalReader,
+ DefaultReader,
DoubleReader,
FixedReader,
FloatReader,
@@ -77,6 +80,8 @@ from pyiceberg.types import (
UUIDType,
)
+STRUCT_ROOT = -1
+
def construct_reader(
file_schema: Union[Schema, IcebergType], read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT
@@ -96,26 +101,53 @@ def resolve(
file_schema: Union[Schema, IcebergType],
read_schema: Union[Schema, IcebergType],
read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
+ read_enums: Dict[int, Callable[..., Enum]] = EMPTY_DICT,
) -> Reader:
"""Resolves the file and read schema to produce a reader
Args:
file_schema (Schema | IcebergType): The schema of the Avro file
read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema
- read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of types to use for struct data
+ read_types (Dict[int, Callable[..., StructProtocol]]): A dict of types to use for struct data
+ read_enums (Dict[int, Callable[..., Enum]]): A dict of fields that have to be converted to an enum
Raises:
NotImplementedError: If attempting to resolve an unrecognized object type
"""
- return visit_with_partner(file_schema, read_schema, SchemaResolver(read_types), SchemaPartnerAccessor()) # type: ignore
+ return visit_with_partner(
+ file_schema, read_schema, SchemaResolver(read_types, read_enums), SchemaPartnerAccessor()
+ ) # type: ignore
+
+
+class EnumReader(Reader):
+ """An Enum reader to wrap primitive values into an Enum"""
+
+ enum: Callable[..., Enum]
+ reader: Reader
+
+ def __init__(self, enum: Callable[..., Enum], reader: Reader) -> None:
+ self.enum = enum
+ self.reader = reader
+
+ def read(self, decoder: BinaryDecoder) -> Enum:
+ return self.enum(self.reader.read(decoder))
+
+ def skip(self, decoder: BinaryDecoder) -> None:
+ pass
class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
read_types: Dict[int, Callable[..., StructProtocol]]
+ read_enums: Dict[int, Callable[..., Enum]]
context: List[int]
- def __init__(self, read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT) -> None:
+ def __init__(
+ self,
+ read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
+ read_enums: Dict[int, Callable[..., Enum]] = EMPTY_DICT,
+ ) -> None:
self.read_types = read_types
+ self.read_enums = read_enums
self.context = []
def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader:
@@ -128,8 +160,7 @@ class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
self.context.pop()
def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader:
- # -1 indicates the struct root
- read_struct_id = self.context[-1] if len(self.context) > 0 else -1
+ read_struct_id = self.context[STRUCT_ROOT] if len(self.context) > 0 else STRUCT_ROOT
struct_callable = self.read_types.get(read_struct_id, Record)
if not expected_struct:
@@ -142,16 +173,26 @@ class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
# first, add readers for the file fields that must be in order
results: List[Tuple[Optional[int], Reader]] = [
- (expected_positions.get(field.field_id), result_reader) for field, result_reader in zip(struct.fields, field_readers)
+ (
+ expected_positions.get(field.field_id),
+ # Check if we need to convert it to an Enum
+ result_reader if not (enum_type := self.read_enums.get(field.field_id)) else EnumReader(enum_type, result_reader),
+ )
+ for field, result_reader in zip(struct.fields, field_readers)
]
file_fields = {field.field_id: field for field in struct.fields}
for pos, read_field in enumerate(expected_struct.fields):
if read_field.field_id not in file_fields:
- if read_field.required:
+ if isinstance(read_field, NestedField) and read_field.initial_default is not None:
+ # The field is not in the file, but there is a default value
+ # and that one can be required
+ results.append((pos, DefaultReader(read_field.initial_default)))
+ elif read_field.required:
raise ResolveError(f"{read_field} is non-optional, and not part of the file schema")
- # Just set the new field to None
- results.append((pos, NoneReader()))
+ else:
+ # Just set the new field to None
+ results.append((pos, NoneReader()))
return StructReader(tuple(results), struct_callable, expected_struct)
diff --git a/python/pyiceberg/manifest.py b/python/pyiceberg/manifest.py
index 942c582b51..ea1d6a611d 100644
--- a/python/pyiceberg/manifest.py
+++ b/python/pyiceberg/manifest.py
@@ -229,9 +229,9 @@ MANIFEST_FILE_SCHEMA: Schema = Schema(
NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
NestedField(501, "manifest_length", LongType(), required=True),
NestedField(502, "partition_spec_id", IntegerType(), required=True),
- NestedField(517, "content", IntegerType(), required=False),
- NestedField(515, "sequence_number", LongType(), required=False),
- NestedField(516, "min_sequence_number", LongType(), required=False),
+ NestedField(517, "content", IntegerType(), required=False, initial_default=ManifestContent.DATA),
+ NestedField(515, "sequence_number", LongType(), required=False, initial_default=0),
+ NestedField(516, "min_sequence_number", LongType(), required=False, initial_default=0),
NestedField(503, "added_snapshot_id", LongType(), required=False),
NestedField(504, "added_files_count", IntegerType(), required=False),
NestedField(505, "existing_files_count", IntegerType(), required=False),
@@ -283,5 +283,7 @@ def files(input_file: InputFile) -> Iterator[DataFile]:
def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
- with AvroFile[ManifestFile](input_file, MANIFEST_FILE_SCHEMA, {-1: ManifestFile, 508: PartitionFieldSummary}) as reader:
+ with AvroFile[ManifestFile](
+ input_file, MANIFEST_FILE_SCHEMA, {-1: ManifestFile, 508: PartitionFieldSummary}, {517: ManifestContent}
+ ) as reader:
yield from reader
diff --git a/python/pyiceberg/types.py b/python/pyiceberg/types.py
index 9b2eb015bc..3fd023ce9a 100644
--- a/python/pyiceberg/types.py
+++ b/python/pyiceberg/types.py
@@ -219,6 +219,7 @@ class NestedField(IcebergType):
field_type: IcebergType = Field(alias="type")
required: bool = Field(default=True)
doc: Optional[str] = Field(default=None, repr=False)
+ initial_default: Any = Field(alias="initial-default", repr=False)
def __init__(
self,
@@ -227,6 +228,7 @@ class NestedField(IcebergType):
field_type: Optional[IcebergType] = None,
required: bool = True,
doc: Optional[str] = None,
+ initial_default: Optional[Any] = None,
**data: Any,
):
# We need an init when we want to use positional arguments, but
@@ -236,6 +238,7 @@ class NestedField(IcebergType):
data["field_type"] = data["type"] if "type" in data else field_type
data["required"] = required
data["doc"] = doc
+ data["initial_default"] = initial_default
super().__init__(**data)
def __str__(self) -> str:
diff --git a/python/tests/avro/test_resolver.py b/python/tests/avro/test_resolver.py
index 42e9146c48..a302294755 100644
--- a/python/tests/avro/test_resolver.py
+++ b/python/tests/avro/test_resolver.py
@@ -24,6 +24,7 @@ from pydantic import Field
from pyiceberg.avro.file import AvroFile
from pyiceberg.avro.reader import (
DecimalReader,
+ DefaultReader,
DoubleReader,
FloatReader,
IntegerReader,
@@ -280,3 +281,23 @@ def test_column_assignment() -> None:
records = list(reader)
assert repr(records) == "[Ints[c=3, d=None]]"
+
+
+def test_resolver_initial_value() -> None:
+ write_schema = Schema(
+ NestedField(1, "name", StringType()),
+ schema_id=1,
+ )
+ read_schema = Schema(
+ NestedField(2, "something", StringType(), required=False, initial_default="vo"),
+ schema_id=2,
+ )
+
+ assert resolve(write_schema, read_schema) == StructReader(
+ (
+ (None, StringReader()), # The one we skip
+ (0, DefaultReader("vo")),
+ ),
+ Record,
+ read_schema.as_struct(),
+ )
diff --git a/python/tests/conftest.py b/python/tests/conftest.py
index b4db30597a..748d8fa76e 100644
--- a/python/tests/conftest.py
+++ b/python/tests/conftest.py
@@ -589,7 +589,7 @@ manifest_entry_records = [
},
]
-manifest_file_records = [
+manifest_file_records_v1 = [
{
"manifest_path": "/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro",
"manifest_length": 7989,
@@ -607,9 +607,31 @@ manifest_file_records = [
}
]
+manifest_file_records_v2 = [
+ {
+ "manifest_path": "/home/iceberg/warehouse/nyc/taxis_partitioned/metadata/0125c686-8aa6-4502-bdcc-b6d17ca41a3b-m0.avro",
+ "manifest_length": 7989,
+ "partition_spec_id": 0,
+ "content": 1,
+ "sequence_number": None, # To be inherited
+ "min_sequence_number": None, # To be inherited
+ "added_snapshot_id": 9182715666859759686,
+ "added_files_count": 3,
+ "existing_files_count": 0,
+ "deleted_files_count": 0,
+ "added_rows_count": 237993,
+ "existing_rows_count": 0,
+ "deleted_rows_count": 0,
+ "partitions": [
+ {"contains_null": True, "contains_nan": False, "lower_bound": b"\x01\x00\x00\x00", "upper_bound": b"\x02\x00\x00\x00"}
+ ],
+ "key_metadata": b"\x19\x25",
+ }
+]
+
@pytest.fixture(scope="session")
-def avro_schema_manifest_file() -> Dict[str, Any]:
+def avro_schema_manifest_file_v1() -> Dict[str, Any]:
return {
"type": "record",
"name": "manifest_file",
@@ -710,6 +732,85 @@ def avro_schema_manifest_file() -> Dict[str, Any]:
}
+@pytest.fixture(scope="session")
+def avro_schema_manifest_file_v2() -> Dict[str, Any]:
+ return {
+ "type": "record",
+ "name": "manifest_file",
+ "fields": [
+ {"name": "manifest_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 500},
+ {"name": "manifest_length", "type": "long", "doc": "Total file size in bytes", "field-id": 501},
+ {"name": "partition_spec_id", "type": "int", "doc": "Spec ID used to write", "field-id": 502},
+ {"name": "content", "type": "int", "doc": "Contents of the manifest: 0=data, 1=deletes", "field-id": 517},
+ {
+ "name": "sequence_number",
+ "type": ["null", "long"],
+ "doc": "Sequence number when the manifest was added",
+ "field-id": 515,
+ },
+ {
+ "name": "min_sequence_number",
+ "type": ["null", "long"],
+ "doc": "Lowest sequence number in the manifest",
+ "field-id": 516,
+ },
+ {"name": "added_snapshot_id", "type": "long", "doc": "Snapshot ID that added the manifest", "field-id": 503},
+ {"name": "added_files_count", "type": "int", "doc": "Added entry count", "field-id": 504},
+ {"name": "existing_files_count", "type": "int", "doc": "Existing entry count", "field-id": 505},
+ {"name": "deleted_files_count", "type": "int", "doc": "Deleted entry count", "field-id": 506},
+ {"name": "added_rows_count", "type": "long", "doc": "Added rows count", "field-id": 512},
+ {"name": "existing_rows_count", "type": "long", "doc": "Existing rows count", "field-id": 513},
+ {"name": "deleted_rows_count", "type": "long", "doc": "Deleted rows count", "field-id": 514},
+ {
+ "name": "partitions",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "r508",
+ "fields": [
+ {
+ "name": "contains_null",
+ "type": "boolean",
+ "doc": "True if any file has a null partition value",
+ "field-id": 509,
+ },
+ {
+ "name": "contains_nan",
+ "type": ["null", "boolean"],
+ "doc": "True if any file has a nan partition value",
+ "default": None,
+ "field-id": 518,
+ },
+ {
+ "name": "lower_bound",
+ "type": ["null", "bytes"],
+ "doc": "Partition lower bound for all files",
+ "default": None,
+ "field-id": 510,
+ },
+ {
+ "name": "upper_bound",
+ "type": ["null", "bytes"],
+ "doc": "Partition upper bound for all files",
+ "default": None,
+ "field-id": 511,
+ },
+ ],
+ },
+ "element-id": 508,
+ },
+ ],
+ "doc": "Summary for each partition",
+ "default": None,
+ "field-id": 507,
+ },
+ ],
+ }
+
+
@pytest.fixture(scope="session")
def avro_schema_manifest_entry() -> Dict[str, Any]:
return {
@@ -969,20 +1070,38 @@ def generated_manifest_entry_file(avro_schema_manifest_entry: Dict[str, Any]) ->
@pytest.fixture(scope="session")
-def generated_manifest_file_file(
- avro_schema_manifest_file: Dict[str, Any], generated_manifest_entry_file: str
+def generated_manifest_file_file_v1(
+ avro_schema_manifest_file_v1: Dict[str, Any], generated_manifest_entry_file: str
+) -> Generator[str, None, None]:
+ from fastavro import parse_schema, writer
+
+ parsed_schema = parse_schema(avro_schema_manifest_file_v1)
+
+ # Make sure that a valid manifest_path is set
+ manifest_file_records_v1[0]["manifest_path"] = generated_manifest_entry_file
+
+ with TemporaryDirectory() as tmpdir:
+ tmp_avro_file = tmpdir + "/manifest.avro"
+ with open(tmp_avro_file, "wb") as out:
+ writer(out, parsed_schema, manifest_file_records_v1)
+ yield tmp_avro_file
+
+
+@pytest.fixture(scope="session")
+def generated_manifest_file_file_v2(
+ avro_schema_manifest_file_v2: Dict[str, Any], generated_manifest_entry_file: str
) -> Generator[str, None, None]:
from fastavro import parse_schema, writer
- parsed_schema = parse_schema(avro_schema_manifest_file)
+ parsed_schema = parse_schema(avro_schema_manifest_file_v2)
# Make sure that a valid manifest_path is set
- manifest_file_records[0]["manifest_path"] = generated_manifest_entry_file
+ manifest_file_records_v2[0]["manifest_path"] = generated_manifest_entry_file
with TemporaryDirectory() as tmpdir:
tmp_avro_file = tmpdir + "/manifest.avro"
with open(tmp_avro_file, "wb") as out:
- writer(out, parsed_schema, manifest_file_records)
+ writer(out, parsed_schema, manifest_file_records_v2)
yield tmp_avro_file
diff --git a/python/tests/utils/test_manifest.py b/python/tests/utils/test_manifest.py
index 548d94cb8f..29905ce55d 100644
--- a/python/tests/utils/test_manifest.py
+++ b/python/tests/utils/test_manifest.py
@@ -20,6 +20,7 @@ from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.manifest import (
DataFile,
FileFormat,
+ ManifestContent,
ManifestEntryStatus,
PartitionFieldSummary,
read_manifest_entry,
@@ -150,8 +151,8 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
assert data_file.sort_order_id == 0
-def test_read_manifest_list(generated_manifest_file_file: str) -> None:
- input_file = PyArrowFileIO().new_input(generated_manifest_file_file)
+def test_read_manifest_list(generated_manifest_file_file_v1: str) -> None:
+ input_file = PyArrowFileIO().new_input(generated_manifest_file_file_v1)
manifest_list = list(read_manifest_list(input_file))[0]
assert manifest_list.manifest_length == 7989
@@ -176,14 +177,14 @@ def test_read_manifest_list(generated_manifest_file_file: str) -> None:
assert manifest_list.deleted_rows_count == 0
-def test_read_manifest(generated_manifest_file_file: str) -> None:
- io = load_file_io({})
+def test_read_manifest_v1(generated_manifest_file_file_v1: str) -> None:
+ io = load_file_io()
snapshot = Snapshot(
snapshot_id=25,
parent_snapshot_id=19,
timestamp_ms=1602638573590,
- manifest_list=generated_manifest_file_file,
+ manifest_list=generated_manifest_file_file_v1,
summary=Summary(Operation.APPEND),
schema_id=3,
)
@@ -191,9 +192,48 @@ def test_read_manifest(generated_manifest_file_file: str) -> None:
assert manifest_list.manifest_length == 7989
assert manifest_list.partition_spec_id == 0
- assert manifest_list.content is None
- assert manifest_list.sequence_number is None
- assert manifest_list.min_sequence_number is None
+ assert manifest_list.content == ManifestContent.DATA
+ assert manifest_list.sequence_number == 0
+ assert manifest_list.min_sequence_number == 0
+ assert manifest_list.added_snapshot_id == 9182715666859759686
+ assert manifest_list.added_files_count == 3
+ assert manifest_list.existing_files_count == 0
+ assert manifest_list.deleted_files_count == 0
+ assert manifest_list.added_rows_count == 237993
+ assert manifest_list.existing_rows_count == 0
+ assert manifest_list.deleted_rows_count == 0
+ assert manifest_list.key_metadata is None
+
+ assert isinstance(manifest_list.partitions, list)
+
+ partition = manifest_list.partitions[0]
+
+ assert isinstance(partition, PartitionFieldSummary)
+
+ assert partition.contains_null is True
+ assert partition.contains_nan is False
+ assert partition.lower_bound == b"\x01\x00\x00\x00"
+ assert partition.upper_bound == b"\x02\x00\x00\x00"
+
+
+def test_read_manifest_v2(generated_manifest_file_file_v2: str) -> None:
+ io = load_file_io()
+
+ snapshot = Snapshot(
+ snapshot_id=25,
+ parent_snapshot_id=19,
+ timestamp_ms=1602638573590,
+ manifest_list=generated_manifest_file_file_v2,
+ summary=Summary(Operation.APPEND),
+ schema_id=3,
+ )
+ manifest_list = snapshot.manifests(io)[0]
+
+ assert manifest_list.manifest_length == 7989
+ assert manifest_list.partition_spec_id == 0
+ assert manifest_list.content == ManifestContent.DELETES
+ assert manifest_list.sequence_number is None # inheritance
+ assert manifest_list.min_sequence_number is None # inheritance
assert manifest_list.added_snapshot_id == 9182715666859759686
assert manifest_list.added_files_count == 3
assert manifest_list.existing_files_count == 0
diff --git a/python/tests/utils/test_schema_conversion.py b/python/tests/utils/test_schema_conversion.py
index 9c5df9ef07..6a8c5a28c7 100644
--- a/python/tests/utils/test_schema_conversion.py
+++ b/python/tests/utils/test_schema_conversion.py
@@ -37,8 +37,8 @@ from pyiceberg.types import (
from pyiceberg.utils.schema_conversion import AvroSchemaConversion
-def test_iceberg_to_avro(avro_schema_manifest_file: Dict[str, Any]) -> None:
- iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file)
+def test_iceberg_to_avro(avro_schema_manifest_file_v1: Dict[str, Any]) -> None:
+ iceberg_schema = AvroSchemaConversion().avro_to_iceberg(avro_schema_manifest_file_v1)
expected_iceberg_schema = Schema(
NestedField(
field_id=500, name="manifest_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"