You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2023/01/14 12:01:54 UTC
[iceberg] branch master updated: Python: Refactor loading manifests (#6525)
This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 24211aa274 Python: Refactor loading manifests (#6525)
24211aa274 is described below
commit 24211aa2741f29eda14f21781138119a461ce289
Author: Fokko Driesprong <fo...@apache.org>
AuthorDate: Sat Jan 14 13:01:48 2023 +0100
Python: Refactor loading manifests (#6525)
* Add ManifestFile StructProtocol implementation.
* Python: Make it work
* Python: Refactor the reading of the manifest
* Comments
* Fix named record
* Revert some stuff
* Extend the test
* Always pass in the struct
* Refactor the Records into a single one
- And remove Pydantic as a base class
* Comments
* Cleanup
* Revert some changes and cleanup
* Make CI happy
* Fix passing in the struct
* It’s easier to ask for forgiveness than permission
* Add some more tests
* Python: Minor changes to Record to support old test cases. (#351)
Co-authored-by: Ryan Blue <bl...@apache.org>
---
python/pyiceberg/avro/file.py | 34 +-
python/pyiceberg/avro/reader.py | 69 +-
python/pyiceberg/avro/resolver.py | 37 +-
python/pyiceberg/catalog/rest.py | 3 +-
python/pyiceberg/cli/output.py | 3 +-
python/pyiceberg/manifest.py | 311 +++++----
python/pyiceberg/partitioning.py | 2 +-
python/pyiceberg/schema.py | 10 +-
python/pyiceberg/table/__init__.py | 25 +-
python/pyiceberg/table/metadata.py | 3 +-
python/pyiceberg/table/refs.py | 2 +-
python/pyiceberg/table/snapshots.py | 2 +-
python/pyiceberg/table/sorting.py | 2 +-
python/pyiceberg/transforms.py | 3 +-
python/pyiceberg/typedef.py | 88 ++-
python/pyiceberg/types.py | 5 +-
python/pyiceberg/utils/iceberg_base_model.py | 62 --
python/tests/avro/test_file.py | 22 +-
python/tests/avro/test_reader.py | 242 ++-----
python/tests/avro/test_resolver.py | 69 +-
python/tests/expressions/test_evaluator.py | 28 +-
python/tests/expressions/test_expressions.py | 60 +-
python/tests/expressions/test_visitors.py | 8 +-
python/tests/table/test_snapshots.py | 39 --
python/tests/test_schema.py | 4 +-
python/tests/test_transforms.py | 2 +-
python/tests/test_typedef.py | 51 +-
python/tests/test_types.py | 2 +-
python/tests/utils/test_manifest.py | 929 +++++----------------------
29 files changed, 785 insertions(+), 1332 deletions(-)
diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py
index 5d469a27d7..56163d7bc7 100644
--- a/python/pyiceberg/avro/file.py
+++ b/python/pyiceberg/avro/file.py
@@ -26,8 +26,10 @@ from types import TracebackType
from typing import (
Callable,
Dict,
+ Generic,
Optional,
Type,
+ TypeVar,
)
from pyiceberg.avro.codecs import KNOWN_CODECS, Codec
@@ -66,10 +68,9 @@ _CODEC_KEY = "avro.codec"
_SCHEMA_KEY = "avro.schema"
-@dataclass(frozen=True)
-class AvroFileHeader:
+class AvroFileHeader(Record):
magic: bytes
- meta: dict[str, str]
+ meta: Dict[str, str]
sync: bytes
def compression_codec(self) -> Optional[Type[Codec]]:
@@ -93,49 +94,52 @@ class AvroFileHeader:
raise ValueError("No schema found in Avro file headers")
+D = TypeVar("D", bound=StructProtocol)
+
+
@dataclass
-class Block:
+class Block(Generic[D]):
reader: Reader
block_records: int
block_decoder: BinaryDecoder
position: int = 0
- def __iter__(self) -> Block:
+ def __iter__(self) -> Block[D]:
return self
def has_next(self) -> bool:
return self.position < self.block_records
- def __next__(self) -> Record:
+ def __next__(self) -> D:
if self.has_next():
self.position += 1
return self.reader.read(self.block_decoder)
raise StopIteration
-class AvroFile:
+class AvroFile(Generic[D]):
input_file: InputFile
read_schema: Optional[Schema]
- read_types: Dict[int, Callable[[Schema], StructProtocol]]
+ read_types: Dict[int, Callable[..., StructProtocol]]
input_stream: InputStream
header: AvroFileHeader
schema: Schema
reader: Reader
decoder: BinaryDecoder
- block: Optional[Block] = None
+ block: Optional[Block[D]] = None
def __init__(
self,
input_file: InputFile,
read_schema: Optional[Schema] = None,
- read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+ read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
) -> None:
self.input_file = input_file
self.read_schema = read_schema
self.read_types = read_types
- def __enter__(self) -> AvroFile:
+ def __enter__(self) -> AvroFile[D]:
"""
Opens the file and reads the header and generates
a reader tree to start reading the payload
@@ -159,7 +163,7 @@ class AvroFile:
) -> None:
self.input_stream.close()
- def __iter__(self) -> AvroFile:
+ def __iter__(self) -> AvroFile[D]:
return self
def _read_block(self) -> int:
@@ -180,7 +184,7 @@ class AvroFile:
)
return block_records
- def __next__(self) -> Record:
+ def __next__(self) -> D:
if self.block and self.block.has_next():
return next(self.block)
@@ -194,6 +198,4 @@ class AvroFile:
raise StopIteration
def _read_header(self) -> AvroFileHeader:
- reader = construct_reader(META_SCHEMA)
- _header = reader.read(self.decoder)
- return AvroFileHeader(magic=_header.get(0), meta=_header.get(1), sync=_header.get(2))
+ return construct_reader(META_SCHEMA, {-1: AvroFileHeader}).read(self.decoder)
diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py
index f7a0194a9a..8cc0c4813e 100644
--- a/python/pyiceberg/avro/reader.py
+++ b/python/pyiceberg/avro/reader.py
@@ -41,7 +41,8 @@ from typing import (
from uuid import UUID
from pyiceberg.avro.decoder import BinaryDecoder
-from pyiceberg.typedef import Record, StructProtocol
+from pyiceberg.typedef import StructProtocol
+from pyiceberg.types import StructType
from pyiceberg.utils.singleton import Singleton
@@ -90,6 +91,9 @@ class Reader(Singleton):
def skip(self, decoder: BinaryDecoder) -> None:
...
+ def __repr__(self) -> str:
+ return f"{self.__class__.__name__}()"
+
class NoneReader(Reader):
def read(self, _: BinaryDecoder) -> None:
@@ -194,6 +198,9 @@ class FixedReader(Reader):
def __len__(self) -> int:
return self._len
+ def __repr__(self) -> str:
+ return f"FixedReader({self._len})"
+
class BinaryReader(Reader):
def read(self, decoder: BinaryDecoder) -> bytes:
@@ -214,6 +221,9 @@ class DecimalReader(Reader):
def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip_bytes()
+ def __repr__(self) -> str:
+ return f"DecimalReader({self.precision}, {self.scale})"
+
@dataclass(frozen=True)
class OptionReader(Reader):
@@ -238,41 +248,58 @@ class OptionReader(Reader):
return self.option.skip(decoder)
-class StructProtocolReader(Reader):
- create_struct: Callable[[], StructProtocol]
- fields: Tuple[Tuple[Optional[int], Reader], ...]
+class StructReader(Reader):
+ field_readers: Tuple[Tuple[Optional[int], Reader], ...]
+ create_struct: Callable[..., StructProtocol]
+ struct: StructType
- def __init__(self, fields: Tuple[Tuple[Optional[int], Reader], ...], create_struct: Callable[[], StructProtocol]):
+ def __init__(
+ self,
+ field_readers: Tuple[Tuple[Optional[int], Reader], ...],
+ create_struct: Callable[..., StructProtocol],
+ struct: StructType,
+ ) -> None:
+ self.field_readers = field_readers
self.create_struct = create_struct
- self.fields = fields
-
- def create_or_reuse(self, reuse: Optional[StructProtocol]) -> StructProtocol:
- if reuse:
- return reuse
- else:
- return self.create_struct()
+ self.struct = struct
+
+ def read(self, decoder: BinaryDecoder) -> StructProtocol:
+ try:
+ # Try initializing the struct, first with the struct keyword argument
+ struct = self.create_struct(struct=self.struct)
+ except TypeError as e:
+ if "'struct' is an invalid keyword argument for" in str(e):
+ struct = self.create_struct()
+ else:
+ raise ValueError(f"Unable to initialize struct: {self.create_struct}") from e
- def read(self, decoder: BinaryDecoder) -> Any:
- struct = self.create_or_reuse(None)
+ if not isinstance(struct, StructProtocol):
+ raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}")
- for (pos, field) in self.fields:
+ for (pos, field) in self.field_readers:
if pos is not None:
- struct.set(pos, field.read(decoder)) # later: pass reuse in here
+ struct[pos] = field.read(decoder) # later: pass reuse in here
else:
field.skip(decoder)
return struct
def skip(self, decoder: BinaryDecoder) -> None:
- for _, field in self.fields:
+ for _, field in self.field_readers:
field.skip(decoder)
+ def __eq__(self, other: Any) -> bool:
+ return (
+ self.field_readers == other.field_readers and self.create_struct == other.create_struct
+ if isinstance(other, StructReader)
+ else False
+ )
-class StructReader(StructProtocolReader):
- fields: Tuple[Tuple[Optional[int], Reader], ...]
+ def __repr__(self) -> str:
+ return f"StructReader(({','.join(repr(field) for field in self.field_readers)}), {repr(self.create_struct)})"
- def __init__(self, fields: Tuple[Tuple[Optional[int], Reader], ...]):
- super().__init__(fields, lambda: Record.of(len(fields)))
+ def __hash__(self) -> int:
+ return hash(self.field_readers)
@dataclass(frozen=True)
diff --git a/python/pyiceberg/avro/resolver.py b/python/pyiceberg/avro/resolver.py
index a53693f415..bacd942b33 100644
--- a/python/pyiceberg/avro/resolver.py
+++ b/python/pyiceberg/avro/resolver.py
@@ -53,7 +53,7 @@ from pyiceberg.schema import (
promote,
visit_with_partner,
)
-from pyiceberg.typedef import EMPTY_DICT, StructProtocol
+from pyiceberg.typedef import EMPTY_DICT, Record, StructProtocol
from pyiceberg.types import (
BinaryType,
BooleanType,
@@ -78,7 +78,9 @@ from pyiceberg.types import (
)
-def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+def construct_reader(
+ file_schema: Union[Schema, IcebergType], read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT
+) -> Reader:
"""Constructs a reader from a file schema
Args:
@@ -87,13 +89,13 @@ def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
Raises:
NotImplementedError: If attempting to resolve an unrecognized object type
"""
- return resolve(file_schema, file_schema)
+ return resolve(file_schema, file_schema, read_types)
def resolve(
file_schema: Union[Schema, IcebergType],
read_schema: Union[Schema, IcebergType],
- read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+ read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
) -> Reader:
"""Resolves the file and read schema to produce a reader
@@ -109,28 +111,39 @@ def resolve(
class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
- read_types: Optional[Dict[int, Callable[[Schema], StructProtocol]]]
+ read_types: Dict[int, Callable[..., StructProtocol]]
+ context: List[int]
- def __init__(self, read_types: Optional[Dict[int, Callable[[Schema], StructProtocol]]]):
+ def __init__(self, read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT) -> None:
self.read_types = read_types
+ self.context = []
def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader:
return result
+ def before_field(self, field: NestedField, field_partner: Optional[NestedField]) -> None:
+ self.context.append(field.field_id)
+
+ def after_field(self, field: NestedField, field_partner: Optional[NestedField]) -> None:
+ self.context.pop()
+
def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+ # -1 indicates the struct root
+ read_struct_id = self.context[-1] if len(self.context) > 0 else -1
+ struct_callable = self.read_types.get(read_struct_id, Record)
+
if not expected_struct:
- return StructReader(tuple(enumerate(field_readers)))
+ return StructReader(tuple(enumerate(field_readers)), struct_callable, struct)
if not isinstance(expected_struct, StructType):
raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}")
- results: List[Tuple[Optional[int], Reader]] = []
expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)}
# first, add readers for the file fields that must be in order
- for field, result_reader in zip(struct.fields, field_readers):
- read_pos = expected_positions.get(field.field_id)
- results.append((read_pos, result_reader))
+ results: List[Tuple[Optional[int], Reader]] = [
+ (expected_positions.get(field.field_id), result_reader) for field, result_reader in zip(struct.fields, field_readers)
+ ]
file_fields = {field.field_id: field for field in struct.fields}
for pos, read_field in enumerate(expected_struct.fields):
@@ -140,7 +153,7 @@ class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
# Just set the new field to None
results.append((pos, NoneReader()))
- return StructReader(tuple(results))
+ return StructReader(tuple(results), struct_callable, expected_struct)
def field(self, field: NestedField, expected_field: Optional[IcebergType], field_reader: Reader) -> Reader:
return field_reader if field.required else OptionReader(field_reader)
diff --git a/python/pyiceberg/catalog/rest.py b/python/pyiceberg/catalog/rest.py
index f7bb6b346c..2f9eb5df11 100644
--- a/python/pyiceberg/catalog/rest.py
+++ b/python/pyiceberg/catalog/rest.py
@@ -57,8 +57,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table, TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
-from pyiceberg.typedef import EMPTY_DICT
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel
ICEBERG_REST_SPEC_VERSION = "0.14.1"
diff --git a/python/pyiceberg/cli/output.py b/python/pyiceberg/cli/output.py
index 5bb61d456b..44c94f6b4c 100644
--- a/python/pyiceberg/cli/output.py
+++ b/python/pyiceberg/cli/output.py
@@ -26,8 +26,7 @@ from rich.tree import Tree
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table, TableMetadata
-from pyiceberg.typedef import Identifier, Properties
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+from pyiceberg.typedef import IcebergBaseModel, Identifier, Properties
class Output(ABC):
diff --git a/python/pyiceberg/manifest.py b/python/pyiceberg/manifest.py
index 9f7f394159..757f3bd016 100644
--- a/python/pyiceberg/manifest.py
+++ b/python/pyiceberg/manifest.py
@@ -15,30 +15,29 @@
# specific language governing permissions and limitations
# under the License.
from enum import Enum
-from functools import singledispatch
from typing import (
Any,
Dict,
Iterator,
List,
Optional,
- Union,
)
-from pydantic import Field
-
from pyiceberg.avro.file import AvroFile
from pyiceberg.io import FileIO, InputFile
from pyiceberg.schema import Schema
from pyiceberg.typedef import Record
from pyiceberg.types import (
- IcebergType,
+ BinaryType,
+ BooleanType,
+ IntegerType,
ListType,
+ LongType,
MapType,
- PrimitiveType,
+ NestedField,
+ StringType,
StructType,
)
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
class DataFileContent(int, Enum):
@@ -76,137 +75,207 @@ class FileFormat(str, Enum):
return f"FileFormat.{self.name}"
-class DataFile(IcebergBaseModel):
- content: DataFileContent = Field(default=DataFileContent.DATA)
- file_path: str = Field()
- file_format: FileFormat = Field()
- partition: Dict[str, Any] = Field()
- record_count: int = Field()
- file_size_in_bytes: int = Field()
- block_size_in_bytes: Optional[int] = Field()
- column_sizes: Optional[Dict[int, int]] = Field()
- value_counts: Optional[Dict[int, int]] = Field()
- null_value_counts: Optional[Dict[int, int]] = Field()
- nan_value_counts: Optional[Dict[int, int]] = Field()
- distinct_counts: Optional[Dict[int, int]] = Field()
- lower_bounds: Optional[Dict[int, bytes]] = Field()
- upper_bounds: Optional[Dict[int, bytes]] = Field()
- key_metadata: Optional[bytes] = Field()
- split_offsets: Optional[List[int]] = Field()
- equality_ids: Optional[List[int]] = Field()
- sort_order_id: Optional[int] = Field()
-
-
-class ManifestEntry(IcebergBaseModel):
- status: ManifestEntryStatus = Field()
- snapshot_id: Optional[int] = Field()
- sequence_number: Optional[int] = Field()
- data_file: DataFile = Field()
-
-
-class PartitionFieldSummary(IcebergBaseModel):
- contains_null: bool = Field()
- contains_nan: Optional[bool] = Field()
- lower_bound: Optional[bytes] = Field()
- upper_bound: Optional[bytes] = Field()
-
-
-class ManifestFile(IcebergBaseModel):
- manifest_path: str = Field()
- manifest_length: int = Field()
- partition_spec_id: int = Field()
- content: ManifestContent = Field(default=ManifestContent.DATA)
- sequence_number: int = Field(default=0)
- min_sequence_number: int = Field(default=0)
- added_snapshot_id: Optional[int] = Field()
- added_data_files_count: Optional[int] = Field()
- existing_data_files_count: Optional[int] = Field()
- deleted_data_files_count: Optional[int] = Field()
- added_rows_count: Optional[int] = Field()
- existing_rows_counts: Optional[int] = Field()
- deleted_rows_count: Optional[int] = Field()
- partitions: Optional[List[PartitionFieldSummary]] = Field()
- key_metadata: Optional[bytes] = Field()
-
- def fetch_manifest_entry(self, io: FileIO) -> List[ManifestEntry]:
- file = io.new_input(self.manifest_path)
- return list(read_manifest_entry(file))
-
+DATA_FILE_TYPE = StructType(
+ NestedField(
+ field_id=134,
+ name="content",
+ field_type=IntegerType(),
+ required=False,
+ doc="Contents of the file: 0=data, 1=position deletes, 2=equality deletes",
+ ),
+ NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
+ NestedField(
+ field_id=101, name="file_format", field_type=StringType(), required=True, doc="File format name: avro, orc, or parquet"
+ ),
+ NestedField(
+ field_id=102,
+ name="partition",
+ field_type=StructType(),
+ required=True,
+ doc="Partition data tuple, schema based on the partition spec",
+ ),
+ NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
+ NestedField(field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"),
+ NestedField(
+ field_id=108,
+ name="column_sizes",
+ field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()),
+ required=True,
+ doc="Map of column id to total size on disk",
+ ),
+ NestedField(
+ field_id=109,
+ name="value_counts",
+ field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()),
+ required=True,
+ doc="Map of column id to total count, including null and NaN",
+ ),
+ NestedField(
+ field_id=110,
+ name="null_value_counts",
+ field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()),
+ required=False,
+ doc="Map of column id to null value count",
+ ),
+ NestedField(
+ field_id=137,
+ name="nan_value_counts",
+ field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()),
+ required=False,
+ doc="Map of column id to number of NaN values in the column",
+ ),
+ NestedField(
+ field_id=125,
+ name="lower_bounds",
+ field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()),
+ required=False,
+ doc="Map of column id to lower bound",
+ ),
+ NestedField(
+ field_id=128,
+ name="upper_bounds",
+ field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()),
+ required=False,
+ doc="Map of column id to upper bound",
+ ),
+ NestedField(field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob"),
+ NestedField(
+ field_id=132,
+ name="split_offsets",
+ field_type=ListType(element_id=133, element_type=LongType(), element_required=True),
+ required=False,
+ doc="Splittable offsets",
+ ),
+ NestedField(
+ field_id=135,
+ name="equality_ids",
+ field_type=ListType(element_id=136, element_type=LongType(), element_required=True),
+ required=False,
+ doc="Equality comparison field IDs",
+ ),
+ NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="Sort order ID"),
+ NestedField(field_id=141, name="spec_id", field_type=IntegerType(), required=False, doc="Partition spec ID"),
+)
-def read_manifest_entry(input_file: InputFile) -> Iterator[ManifestEntry]:
- with AvroFile(input_file) as reader:
- schema = reader.schema
- for record in reader:
- dict_repr = _convert_pos_to_dict(schema, record)
- yield ManifestEntry(**dict_repr)
+class DataFile(Record):
+ content: Optional[DataFileContent]
+ file_path: str
+ file_format: FileFormat
+ partition: Record
+ record_count: int
+ file_size_in_bytes: int
+ column_sizes: Dict[int, int]
+ value_counts: Dict[int, int]
+ null_value_counts: Dict[int, int]
+ nan_value_counts: Dict[int, int]
+ lower_bounds: Dict[int, bytes]
+ upper_bounds: Dict[int, bytes]
+ key_metadata: Optional[bytes]
+ split_offsets: Optional[List[int]]
+ equality_ids: Optional[List[int]]
+ sort_order_id: Optional[int]
+ spec_id: Optional[int]
+
+ def __init__(self, *data: Any, **named_data: Any) -> None:
+ super().__init__(*data, **{"struct": DATA_FILE_TYPE, **named_data})
+
+
+MANIFEST_ENTRY_SCHEMA = Schema(
+ NestedField(0, "status", IntegerType(), required=True),
+ NestedField(1, "snapshot_id", LongType(), required=False),
+ NestedField(3, "sequence_number", LongType(), required=False),
+ NestedField(4, "file_sequence_number", LongType(), required=False),
+ NestedField(2, "data_file", DATA_FILE_TYPE, required=False),
+)
-def live_entries(input_file: InputFile) -> Iterator[ManifestEntry]:
- return (entry for entry in read_manifest_entry(input_file) if entry.status != ManifestEntryStatus.DELETED)
+class ManifestEntry(Record):
+ status: ManifestEntryStatus
+ snapshot_id: Optional[int]
+ sequence_number: Optional[int]
+ file_sequence_number: Optional[int]
+ data_file: DataFile
-def files(input_file: InputFile) -> Iterator[DataFile]:
- return (entry.data_file for entry in live_entries(input_file))
+ def __init__(self, *data: Any, **named_data: Any) -> None:
+ super().__init__(*data, **{"struct": MANIFEST_ENTRY_SCHEMA.as_struct(), **named_data})
-def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
- with AvroFile(input_file) as reader:
- schema = reader.schema
- for record in reader:
- dict_repr = _convert_pos_to_dict(schema, record)
- yield ManifestFile(**dict_repr)
-
-
-@singledispatch
-def _convert_pos_to_dict(schema: Union[Schema, IcebergType], struct: Record) -> Dict[str, Any]:
- """Converts the positions in the field names
+PARTITION_FIELD_SUMMARY_TYPE = StructType(
+ NestedField(509, "contains_null", BooleanType(), required=True),
+ NestedField(518, "contains_nan", BooleanType(), required=False),
+ NestedField(510, "lower_bound", BinaryType(), required=False),
+ NestedField(511, "upper_bound", BinaryType(), required=False),
+)
- This makes it easy to map it onto a Pydantic model. Might change later on depending on the performance
- Args:
- schema (Schema | IcebergType): The schema of the file
- struct (Record): The struct containing the data by positions
+class PartitionFieldSummary(Record):
+ contains_null: bool
+ contains_nan: Optional[bool]
+ lower_bound: Optional[bytes]
+ upper_bound: Optional[bytes]
+
+ def __init__(self, *data: Any, **named_data: Any) -> None:
+ super().__init__(*data, **{"struct": PARTITION_FIELD_SUMMARY_TYPE, **named_data})
+
+
+MANIFEST_FILE_SCHEMA: Schema = Schema(
+ NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
+ NestedField(501, "manifest_length", LongType(), required=True),
+ NestedField(502, "partition_spec_id", IntegerType(), required=True),
+ NestedField(517, "content", IntegerType(), required=False),
+ NestedField(515, "sequence_number", LongType(), required=False),
+ NestedField(516, "min_sequence_number", LongType(), required=False),
+ NestedField(503, "added_snapshot_id", LongType(), required=False),
+ NestedField(504, "added_files_count", IntegerType(), required=False),
+ NestedField(505, "existing_files_count", IntegerType(), required=False),
+ NestedField(506, "deleted_files_count", IntegerType(), required=False),
+ NestedField(512, "added_rows_count", LongType(), required=False),
+ NestedField(513, "existing_rows_count", LongType(), required=False),
+ NestedField(514, "deleted_rows_count", LongType(), required=False),
+ NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
+ NestedField(519, "key_metadata", BinaryType(), required=False),
+)
- Raises:
- NotImplementedError: If attempting to handle an unknown type in the schema
- """
- raise NotImplementedError(f"Cannot traverse non-type: {schema}")
+class ManifestFile(Record):
+ manifest_path: str
+ manifest_length: int
+ partition_spec_id: int
+ content: Optional[ManifestContent]
+ sequence_number: Optional[int]
+ min_sequence_number: Optional[int]
+ added_snapshot_id: Optional[int]
+ added_files_count: Optional[int]
+ existing_files_count: Optional[int]
+ deleted_files_count: Optional[int]
+ added_rows_count: Optional[int]
+ existing_rows_count: Optional[int]
+ deleted_rows_count: Optional[int]
+ partitions: Optional[List[PartitionFieldSummary]]
+ key_metadata: Optional[bytes]
+
+ def __init__(self, *data: Any, **named_data: Any) -> None:
+ super().__init__(*data, **{"struct": MANIFEST_FILE_SCHEMA.as_struct(), **named_data})
-@_convert_pos_to_dict.register
-def _(schema: Schema, struct: Record) -> Dict[str, Any]:
- return _convert_pos_to_dict(schema.as_struct(), struct)
+ def fetch_manifest_entry(self, io: FileIO) -> List[ManifestEntry]:
+ file = io.new_input(self.manifest_path)
+ return list(read_manifest_entry(file))
-@_convert_pos_to_dict.register
-def _(struct_type: StructType, values: Record) -> Dict[str, Any]:
- """Iterates over all the fields in the dict, and gets the data from the struct"""
- return (
- {field.name: _convert_pos_to_dict(field.field_type, values.get(pos)) for pos, field in enumerate(struct_type.fields)}
- if values is not None
- else None
- )
+def read_manifest_entry(input_file: InputFile) -> Iterator[ManifestEntry]:
+ with AvroFile[ManifestEntry](input_file, MANIFEST_ENTRY_SCHEMA, {-1: ManifestEntry, 2: DataFile}) as reader:
+ yield from reader
-@_convert_pos_to_dict.register
-def _(list_type: ListType, values: List[Any]) -> Any:
- """In the case of a list, we'll go over the elements in the list to handle complex types"""
- return [_convert_pos_to_dict(list_type.element_type, value) for value in values] if values is not None else None
+def live_entries(input_file: InputFile) -> Iterator[ManifestEntry]:
+ return (entry for entry in read_manifest_entry(input_file) if entry.status != ManifestEntryStatus.DELETED)
-@_convert_pos_to_dict.register
-def _(map_type: MapType, values: Dict[Any, Any]) -> Dict[Any, Any]:
- """In the case of a map, we both traverse over the key and value to handle complex types"""
- return (
- {
- _convert_pos_to_dict(map_type.key_type, key): _convert_pos_to_dict(map_type.value_type, value)
- for key, value in values.items()
- }
- if values is not None
- else None
- )
+def files(input_file: InputFile) -> Iterator[DataFile]:
+ return (entry.data_file for entry in live_entries(input_file))
-@_convert_pos_to_dict.register
-def _(_: PrimitiveType, value: Any) -> Any:
- return value
+def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
+ with AvroFile[ManifestFile](input_file, MANIFEST_FILE_SCHEMA, {-1: ManifestFile, 508: PartitionFieldSummary}) as reader:
+ yield from reader
diff --git a/python/pyiceberg/partitioning.py b/python/pyiceberg/partitioning.py
index aca79bfa76..9313b1606c 100644
--- a/python/pyiceberg/partitioning.py
+++ b/python/pyiceberg/partitioning.py
@@ -27,8 +27,8 @@ from pydantic import Field
from pyiceberg.schema import Schema
from pyiceberg.transforms import Transform
+from pyiceberg.typedef import IcebergBaseModel
from pyiceberg.types import NestedField, StructType
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
INITIAL_PARTITION_SPEC_ID = 0
_PARTITION_DATA_ID_START: int = 1000
diff --git a/python/pyiceberg/schema.py b/python/pyiceberg/schema.py
index 1a85cb4baa..45e9ffe26a 100644
--- a/python/pyiceberg/schema.py
+++ b/python/pyiceberg/schema.py
@@ -36,7 +36,7 @@ from typing import (
from pydantic import Field, PrivateAttr
from pyiceberg.exceptions import ResolveError
-from pyiceberg.typedef import EMPTY_DICT, StructProtocol
+from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel, StructProtocol
from pyiceberg.types import (
BinaryType,
BooleanType,
@@ -59,7 +59,6 @@ from pyiceberg.types import (
TimeType,
UUIDType,
)
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
T = TypeVar("T")
P = TypeVar("P")
@@ -94,6 +93,9 @@ class Schema(IcebergBaseModel):
def __repr__(self) -> str:
return f"Schema({', '.join(repr(column) for column in self.columns)}, schema_id={self.schema_id}, identifier_field_ids={self.identifier_field_ids})"
+ def __len__(self) -> int:
+ return len(self.fields)
+
def __eq__(self, other: Any) -> bool:
if not other:
return False
@@ -684,11 +686,11 @@ class Accessor:
Any: The value at position `self.position` in the container
"""
pos = self.position
- val = container.get(pos)
+ val = container[pos]
inner = self
while inner.inner:
inner = inner.inner
- val = val.get(inner.position)
+ val = val[inner.position]
return val
diff --git a/python/pyiceberg/table/__init__.py b/python/pyiceberg/table/__init__.py
index 3fb0702889..c4c23100c9 100644
--- a/python/pyiceberg/table/__init__.py
+++ b/python/pyiceberg/table/__init__.py
@@ -54,9 +54,7 @@ from pyiceberg.typedef import (
Identifier,
KeyDefaultDict,
Properties,
- StructProtocol,
)
-from pyiceberg.types import StructType
if TYPE_CHECKING:
import pandas as pd
@@ -259,24 +257,6 @@ class FileScanTask(ScanTask):
self.length = length or data_file.file_size_in_bytes
-class _DictAsStruct(StructProtocol):
- pos_to_name: Dict[int, str]
- wrapped: Dict[str, Any]
-
- def __init__(self, partition_type: StructType):
- self.pos_to_name = {pos: field.name for pos, field in enumerate(partition_type.fields)}
-
- def wrap(self, to_wrap: Dict[str, Any]) -> _DictAsStruct:
- self.wrapped = to_wrap
- return self
-
- def get(self, pos: int) -> Any:
- return self.wrapped[self.pos_to_name[pos]]
-
- def set(self, pos: int, value: Any) -> None:
- raise NotImplementedError("Cannot set values in DictAsStruct")
-
-
class DataScan(TableScan["DataScan"]):
def __init__(
self,
@@ -307,11 +287,8 @@ class DataScan(TableScan["DataScan"]):
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]
- # TODO: remove the dict to struct wrapper by using a StructProtocol record # pylint: disable=W0511
- wrapper = _DictAsStruct(partition_type)
evaluator = visitors.expression_evaluator(partition_schema, partition_expr, self.case_sensitive)
-
- return lambda data_file: evaluator(wrapper.wrap(data_file.partition))
+ return lambda data_file: evaluator(data_file.partition)
def plan_files(self) -> Iterator[FileScanTask]:
snapshot = self.snapshot()
diff --git a/python/pyiceberg/table/metadata.py b/python/pyiceberg/table/metadata.py
index 5704222d2c..31e74e7084 100644
--- a/python/pyiceberg/table/metadata.py
+++ b/python/pyiceberg/table/metadata.py
@@ -39,9 +39,8 @@ from pyiceberg.table.sorting import (
SortOrder,
assign_fresh_sort_order_ids,
)
-from pyiceberg.typedef import EMPTY_DICT, Properties
+from pyiceberg.typedef import EMPTY_DICT, IcebergBaseModel, Properties
from pyiceberg.utils.datetime import datetime_to_micros
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
CURRENT_SNAPSHOT_ID = "current_snapshot_id"
CURRENT_SCHEMA_ID = "current_schema_id"
diff --git a/python/pyiceberg/table/refs.py b/python/pyiceberg/table/refs.py
index 58f6121ed9..09cbad27f1 100644
--- a/python/pyiceberg/table/refs.py
+++ b/python/pyiceberg/table/refs.py
@@ -19,7 +19,7 @@ from typing import Optional
from pydantic import Field
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+from pyiceberg.typedef import IcebergBaseModel
MAIN_BRANCH = "main"
diff --git a/python/pyiceberg/table/snapshots.py b/python/pyiceberg/table/snapshots.py
index af17fd4b4e..83dd3f66a7 100644
--- a/python/pyiceberg/table/snapshots.py
+++ b/python/pyiceberg/table/snapshots.py
@@ -27,7 +27,7 @@ from pydantic import Field, PrivateAttr, root_validator
from pyiceberg.io import FileIO
from pyiceberg.manifest import ManifestFile, read_manifest_list
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+from pyiceberg.typedef import IcebergBaseModel
OPERATION = "operation"
diff --git a/python/pyiceberg/table/sorting.py b/python/pyiceberg/table/sorting.py
index 67f27d82eb..7ca64399d6 100644
--- a/python/pyiceberg/table/sorting.py
+++ b/python/pyiceberg/table/sorting.py
@@ -29,8 +29,8 @@ from pydantic import Field, root_validator
from pyiceberg.schema import Schema
from pyiceberg.transforms import IdentityTransform, Transform
+from pyiceberg.typedef import IcebergBaseModel
from pyiceberg.types import IcebergType
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
class SortDirection(Enum):
diff --git a/python/pyiceberg/transforms.py b/python/pyiceberg/transforms.py
index b163c02569..e0197f0639 100644
--- a/python/pyiceberg/transforms.py
+++ b/python/pyiceberg/transforms.py
@@ -60,7 +60,7 @@ from pyiceberg.expressions.literals import (
TimestampLiteral,
literal,
)
-from pyiceberg.typedef import L
+from pyiceberg.typedef import IcebergBaseModel, L
from pyiceberg.types import (
BinaryType,
DateType,
@@ -77,7 +77,6 @@ from pyiceberg.types import (
)
from pyiceberg.utils import datetime
from pyiceberg.utils.decimal import decimal_to_bytes, truncate_decimal
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
from pyiceberg.utils.parsing import ParseNumberFromBrackets
from pyiceberg.utils.singleton import Singleton
diff --git a/python/pyiceberg/typedef.py b/python/pyiceberg/typedef.py
index 228b9a927c..bdf467101c 100644
--- a/python/pyiceberg/typedef.py
+++ b/python/pyiceberg/typedef.py
@@ -18,12 +18,15 @@ from __future__ import annotations
from abc import abstractmethod
from decimal import Decimal
+from functools import cached_property
from typing import (
+ TYPE_CHECKING,
Any,
Callable,
Dict,
- List,
+ Optional,
Protocol,
+ Set,
Tuple,
TypeVar,
Union,
@@ -31,6 +34,11 @@ from typing import (
)
from uuid import UUID
+from pydantic import BaseModel
+
+if TYPE_CHECKING:
+ from pyiceberg.types import StructType
+
class FrozenDict(Dict[Any, Any]):
def __setitem__(self, instance: Any, value: Any) -> None:
@@ -42,7 +50,6 @@ class FrozenDict(Dict[Any, Any]):
EMPTY_DICT = FrozenDict()
-
K = TypeVar("K")
V = TypeVar("V")
@@ -72,34 +79,79 @@ class StructProtocol(Protocol): # pragma: no cover
"""A generic protocol used by accessors to get and set at positions of an object"""
@abstractmethod
- def get(self, pos: int) -> Any:
+ def __getitem__(self, pos: int) -> Any:
...
@abstractmethod
- def set(self, pos: int, value: Any) -> None:
+ def __setitem__(self, pos: int, value: Any) -> None:
...
+class IcebergBaseModel(BaseModel):
+ """
+ This class extends the Pydantic BaseModel to set default values by overriding them.
+
+ This is because we always want to set by_alias to True. In Python, the dash can't
+ be used in variable names, and this is used throughout the Iceberg spec.
+
+ The same goes for exclude_none, if a field is None we want to omit it from
+ serialization, for example, the doc attribute on the NestedField object.
+ Default non-null values will be serialized.
+
+ This is recommended by Pydantic:
+ https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally
+ """
+
+ class Config:
+ keep_untouched = (cached_property,)
+ allow_population_by_field_name = True
+ frozen = True
+
+ def _exclude_private_properties(self, exclude: Optional[Set[str]] = None) -> Set[str]:
+ # A small trick to exclude private properties. Properties are serialized by pydantic,
+ # regardless if they start with an underscore.
+ # This will look at the dict, and find the fields and exclude them
+ return set.union(
+ {field for field in self.__dict__ if field.startswith("_") and not field == "__root__"}, exclude or set()
+ )
+
+ def dict(self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, **kwargs: Any) -> Dict[str, Any]:
+ return super().dict(exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), **kwargs)
+
+ def json(self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, by_alias: bool = True, **kwargs: Any) -> str:
+ return super().json(
+ exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), by_alias=by_alias, **kwargs
+ )
+
+
class Record(StructProtocol):
- _data: List[Union[Any, StructProtocol]]
+ _position_to_field_name: Dict[int, str]
+
+ def __init__(self, *data: Any, struct: Optional[StructType] = None, **named_data: Any) -> None:
+ if struct is not None:
+ self._position_to_field_name = {idx: field.name for idx, field in enumerate(struct.fields)}
+ elif named_data:
+ # Order of named_data is preserved (PEP 468) so this can be used to generate the position dict
+ self._position_to_field_name = {idx: name for idx, name in enumerate(named_data.keys())}
+ else:
+ self._position_to_field_name = {idx: f"field{idx + 1}" for idx in range(len(data))}
- @staticmethod
- def of(num_fields: int) -> Record:
- return Record(*([None] * num_fields))
+ for idx, d in enumerate(data):
+ self[idx] = d
- def __init__(self, *data: Union[Any, StructProtocol]) -> None:
- self._data = list(data)
+ for field_name, d in named_data.items():
+ self.__setattr__(field_name, d)
- def set(self, pos: int, value: Any) -> None:
- print(f"set({pos}, {repr(value)})")
- self._data[pos] = value
+ def __setitem__(self, pos: int, value: Any) -> None:
+ self.__setattr__(self._position_to_field_name[pos], value)
- def get(self, pos: int) -> Any:
- return self._data[pos]
+ def __getitem__(self, pos: int) -> Any:
+ return self.__getattribute__(self._position_to_field_name[pos])
def __eq__(self, other: Any) -> bool:
- # For testing
- return True if isinstance(other, Record) and other._data == self._data else False
+ if not isinstance(other, Record):
+ return False
+ return self.__dict__ == other.__dict__
def __repr__(self) -> str:
- return f"{self.__class__.__name__}[" + ", ".join([repr(e) for e in self._data]) + "]"
+ return f"{self.__class__.__name__}[{', '.join(f'{key}={repr(value)}' for key, value in self.__dict__.items() if not key.startswith('_'))}]"
diff --git a/python/pyiceberg/types.py b/python/pyiceberg/types.py
index 6a736ed19d..e11f6138c9 100644
--- a/python/pyiceberg/types.py
+++ b/python/pyiceberg/types.py
@@ -45,7 +45,7 @@ from typing import (
from pydantic import Field, PrivateAttr
from pydantic.typing import AnyCallable
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
+from pyiceberg.typedef import IcebergBaseModel
from pyiceberg.utils.parsing import ParseNumberFromBrackets
from pyiceberg.utils.singleton import Singleton
@@ -280,6 +280,9 @@ class StructType(IcebergType):
def __repr__(self) -> str:
return f"StructType(fields=({', '.join(map(repr, self.fields))},))"
+ def __len__(self) -> int:
+ return len(self.fields)
+
class ListType(IcebergType):
"""A list type in Iceberg
diff --git a/python/pyiceberg/utils/iceberg_base_model.py b/python/pyiceberg/utils/iceberg_base_model.py
deleted file mode 100644
index 8c5bc94f68..0000000000
--- a/python/pyiceberg/utils/iceberg_base_model.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from functools import cached_property
-from typing import (
- Any,
- Dict,
- Optional,
- Set,
-)
-
-from pydantic import BaseModel
-
-
-class IcebergBaseModel(BaseModel):
- """
- This class extends the Pydantic BaseModel to set default values by overriding them.
-
- This is because we always want to set by_alias to True. In Python, the dash can't
- be used in variable names, and this is used throughout the Iceberg spec.
-
- The same goes for exclude_none, if a field is None we want to omit it from
- serialization, for example, the doc attribute on the NestedField object.
- Default non-null values will be serialized.
-
- This is recommended by Pydantic:
- https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally
- """
-
- class Config:
- keep_untouched = (cached_property,)
- allow_population_by_field_name = True
- frozen = True
-
- def _exclude_private_properties(self, exclude: Optional[Set[str]] = None) -> Set[str]:
- # A small trick to exclude private properties. Properties are serialized by pydantic,
- # regardless if they start with an underscore.
- # This will look at the dict, and find the fields and exclude them
- return set.union(
- {field for field in self.__dict__ if field.startswith("_") and not field == "__root__"}, exclude or set()
- )
-
- def dict(self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, **kwargs: Any) -> Dict[str, Any]:
- return super().dict(exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), **kwargs)
-
- def json(self, exclude_none: bool = True, exclude: Optional[Set[str]] = None, by_alias: bool = True, **kwargs: Any) -> str:
- return super().json(
- exclude_none=exclude_none, exclude=self._exclude_private_properties(exclude), by_alias=by_alias, **kwargs
- )
diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py
index fc46963375..cdb973a395 100644
--- a/python/tests/avro/test_file.py
+++ b/python/tests/avro/test_file.py
@@ -17,21 +17,30 @@
import pytest
from pyiceberg.avro.codecs import DeflateCodec
-from pyiceberg.avro.file import AvroFileHeader
+from pyiceberg.avro.file import META_SCHEMA, AvroFileHeader
def get_deflate_compressor() -> None:
- header = AvroFileHeader(bytes(0), {"avro.codec": "deflate"}, bytes(16))
+ header = AvroFileHeader(struct=META_SCHEMA)
+ header[0] = bytes(0)
+ header[1] = {"avro.codec": "deflate"}
+ header[2] = bytes(16)
assert header.compression_codec() == DeflateCodec
def get_null_compressor() -> None:
- header = AvroFileHeader(bytes(0), {"avro.codec": "null"}, bytes(16))
+ header = AvroFileHeader(struct=META_SCHEMA)
+ header[0] = bytes(0)
+ header[1] = {"avro.codec": "null"}
+ header[2] = bytes(16)
assert header.compression_codec() is None
def test_unknown_codec() -> None:
- header = AvroFileHeader(bytes(0), {"avro.codec": "unknown"}, bytes(16))
+ header = AvroFileHeader(struct=META_SCHEMA)
+ header[0] = bytes(0)
+ header[1] = {"avro.codec": "unknown"}
+ header[2] = bytes(16)
with pytest.raises(ValueError) as exc_info:
header.compression_codec()
@@ -40,7 +49,10 @@ def test_unknown_codec() -> None:
def test_missing_schema() -> None:
- header = AvroFileHeader(bytes(0), {}, bytes(16))
+ header = AvroFileHeader(struct=META_SCHEMA)
+ header[0] = bytes(0)
+ header[1] = {}
+ header[2] = bytes(16)
with pytest.raises(ValueError) as exc_info:
header.get_schema()
diff --git a/python/tests/avro/test_reader.py b/python/tests/avro/test_reader.py
index f8ec3326ca..b8736dd69b 100644
--- a/python/tests/avro/test_reader.py
+++ b/python/tests/avro/test_reader.py
@@ -19,6 +19,7 @@ import json
import pytest
+from pyiceberg.avro.decoder import BinaryDecoder
from pyiceberg.avro.file import AvroFile
from pyiceberg.avro.reader import (
BinaryReader,
@@ -30,14 +31,16 @@ from pyiceberg.avro.reader import (
FloatReader,
IntegerReader,
StringReader,
+ StructReader,
TimeReader,
TimestampReader,
TimestamptzReader,
UUIDReader,
)
from pyiceberg.avro.resolver import construct_reader
+from pyiceberg.io.memory import MemoryInputStream
from pyiceberg.io.pyarrow import PyArrowFileIO
-from pyiceberg.manifest import _convert_pos_to_dict
+from pyiceberg.manifest import MANIFEST_ENTRY_SCHEMA, DataFile, ManifestEntry
from pyiceberg.schema import Schema
from pyiceberg.typedef import Record
from pyiceberg.types import (
@@ -49,9 +52,7 @@ from pyiceberg.types import (
FixedType,
FloatType,
IntegerType,
- ListType,
LongType,
- MapType,
NestedField,
PrimitiveType,
StringType,
@@ -64,7 +65,11 @@ from pyiceberg.types import (
def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_schema: Schema) -> None:
- with AvroFile(PyArrowFileIO().new_input(generated_manifest_entry_file)) as reader:
+ with AvroFile[ManifestEntry](
+ PyArrowFileIO().new_input(generated_manifest_entry_file),
+ MANIFEST_ENTRY_SCHEMA,
+ {-1: ManifestEntry, 2: DataFile},
+ ) as reader:
header = reader.header
assert header.magic == b"Obj\x01"
@@ -264,187 +269,6 @@ def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_
assert header.get_schema() == iceberg_manifest_entry_schema
-def test_read_manifest_entry_file(generated_manifest_entry_file: str) -> None:
- with AvroFile(PyArrowFileIO().new_input(generated_manifest_entry_file)) as reader:
- # Consume the generator
- records = list(reader)
-
- assert len(records) == 2, f"Expected 2 records, got {len(records)}"
- assert records[0] == Record(
- 1,
- 8744736658442914487,
- Record(
- "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
- "PARQUET",
- Record(1, 1925),
- 19513,
- 388872,
- 67108864,
- {
- 1: 53,
- 2: 98153,
- 3: 98693,
- 4: 53,
- 5: 53,
- 6: 53,
- 7: 17425,
- 8: 18528,
- 9: 53,
- 10: 44788,
- 11: 35571,
- 12: 53,
- 13: 1243,
- 14: 2355,
- 15: 12750,
- 16: 4029,
- 17: 110,
- 18: 47194,
- 19: 2948,
- },
- {
- 1: 19513,
- 2: 19513,
- 3: 19513,
- 4: 19513,
- 5: 19513,
- 6: 19513,
- 7: 19513,
- 8: 19513,
- 9: 19513,
- 10: 19513,
- 11: 19513,
- 12: 19513,
- 13: 19513,
- 14: 19513,
- 15: 19513,
- 16: 19513,
- 17: 19513,
- 18: 19513,
- 19: 19513,
- },
- {
- 1: 19513,
- 2: 0,
- 3: 0,
- 4: 19513,
- 5: 19513,
- 6: 19513,
- 7: 0,
- 8: 0,
- 9: 19513,
- 10: 0,
- 11: 0,
- 12: 19513,
- 13: 0,
- 14: 0,
- 15: 0,
- 16: 0,
- 17: 0,
- 18: 0,
- 19: 0,
- },
- {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
- {
- 2: b"2020-04-01 00:00",
- 3: b"2020-04-01 00:12",
- 7: b"\x03\x00\x00\x00",
- 8: b"\x01\x00\x00\x00",
- 10: b"\xf6(\\\x8f\xc2\x05S\xc0",
- 11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
- 15: b")\\\x8f\xc2\xf5(\x08\xc0",
- 16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
- 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
- },
- {
- 2: b"2020-04-30 23:5:",
- 3: b"2020-05-01 00:41",
- 7: b"\t\x01\x00\x00",
- 8: b"\t\x01\x00\x00",
- 10: b"\xcd\xcc\xcc\xcc\xcc,_@",
- 11: b"\x1f\x85\xebQ\\\xe2\xfe@",
- 13: b"\x00\x00\x00\x00\x00\x00\x12@",
- 14: b"\x00\x00\x00\x00\x00\x00\xe0?",
- 15: b"q=\n\xd7\xa3\xf01@",
- 16: b"\x00\x00\x00\x00\x00`B@",
- 17: b"333333\xd3?",
- 18: b"\x00\x00\x00\x00\x00\x18b@",
- 19: b"\x00\x00\x00\x00\x00\x00\x04@",
- },
- None,
- [4],
- 0,
- ),
- )
-
-
-def test_read_manifest_file_file(generated_manifest_file_file: str) -> None:
- with AvroFile(PyArrowFileIO().new_input(generated_manifest_file_file)) as reader:
- # Consume the generator
- records = list(reader)
-
- assert len(records) == 1, f"Expected 1 records, got {len(records)}"
- actual = records[0]
- expected = Record(
- actual.get(0),
- 7989,
- 0,
- 9182715666859759686,
- 3,
- 0,
- 0,
- [Record(True, False, b"\x01\x00\x00\x00", b"\x02\x00\x00\x00")],
- 237993,
- 0,
- 0,
- )
- assert actual == expected
-
-
-def test_null_list_convert_pos_to_dict() -> None:
- data = _convert_pos_to_dict(
- Schema(
- NestedField(name="field", field_id=1, field_type=ListType(element_id=2, element=StringType(), element_required=False))
- ),
- Record(None),
- )
- assert data["field"] is None
-
-
-def test_null_dict_convert_pos_to_dict() -> None:
- data = _convert_pos_to_dict(
- Schema(
- NestedField(
- name="field",
- field_id=1,
- field_type=MapType(key_id=2, key_type=StringType(), value_id=3, value_type=StringType(), value_required=False),
- )
- ),
- Record(None),
- )
- assert data["field"] is None
-
-
-def test_null_struct_convert_pos_to_dict() -> None:
- data = _convert_pos_to_dict(
- Schema(
- NestedField(
- name="field",
- field_id=1,
- field_type=StructType(
- NestedField(2, "required_field", StringType(), True), NestedField(3, "optional_field", IntegerType())
- ),
- required=False,
- )
- ),
- Record(None),
- )
- assert data["field"] is None
-
-
def test_fixed_reader() -> None:
assert construct_reader(FixedType(22)) == FixedReader(22)
@@ -509,3 +333,51 @@ def test_unknown_type() -> None:
def test_uuid_reader() -> None:
assert construct_reader(UUIDType()) == UUIDReader()
+
+
+def test_read_struct() -> None:
+ mis = MemoryInputStream(b"\x18")
+ decoder = BinaryDecoder(mis)
+
+ struct = StructType(NestedField(1, "id", IntegerType(), required=True))
+ result = StructReader(((0, IntegerReader()),), Record, struct).read(decoder)
+ assert repr(result) == "Record[id=12]"
+
+
+def test_read_struct_lambda() -> None:
+ mis = MemoryInputStream(b"\x18")
+ decoder = BinaryDecoder(mis)
+
+ struct = StructType(NestedField(1, "id", IntegerType(), required=True))
+ # You can also pass in an arbitrary function that returns a struct
+ result = StructReader(
+ ((0, IntegerReader()),), lambda struct: Record(struct=struct), struct # pylint: disable=unnecessary-lambda
+ ).read(decoder)
+ assert repr(result) == "Record[id=12]"
+
+
+def test_read_not_struct_type() -> None:
+ mis = MemoryInputStream(b"\x18")
+ decoder = BinaryDecoder(mis)
+
+ struct = StructType(NestedField(1, "id", IntegerType(), required=True))
+ with pytest.raises(ValueError) as exc_info:
+ _ = StructReader(((0, IntegerReader()),), str, struct).read(decoder) # type: ignore
+
+ assert "Incompatible with StructProtocol: <class 'str'>" in str(exc_info.value)
+
+
+def test_read_struct_exception_handling() -> None:
+ mis = MemoryInputStream(b"\x18")
+ decoder = BinaryDecoder(mis)
+
+ def raise_err(struct: StructType) -> None:
+ raise TypeError("boom")
+
+ struct = StructType(NestedField(1, "id", IntegerType(), required=True))
+ # You can also pass in an arbitrary function that returns a struct
+
+ with pytest.raises(ValueError) as exc_info:
+ _ = StructReader(((0, IntegerReader()),), raise_err, struct).read(decoder) # type: ignore
+
+ assert "Unable to initialize struct:" in str(exc_info.value)
diff --git a/python/tests/avro/test_resolver.py b/python/tests/avro/test_resolver.py
index c36b76922a..42e9146c48 100644
--- a/python/tests/avro/test_resolver.py
+++ b/python/tests/avro/test_resolver.py
@@ -14,8 +14,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+from tempfile import TemporaryDirectory
+from typing import Optional
+
import pytest
+from pydantic import Field
+from pyiceberg.avro.file import AvroFile
from pyiceberg.avro.reader import (
DecimalReader,
DoubleReader,
@@ -26,7 +32,9 @@ from pyiceberg.avro.reader import (
StructReader,
)
from pyiceberg.avro.resolver import ResolveError, resolve
+from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.schema import Schema
+from pyiceberg.typedef import Record
from pyiceberg.types import (
BinaryType,
DecimalType,
@@ -57,14 +65,16 @@ def test_resolver() -> None:
NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
schema_id=1,
)
+
+ location_struct = StructType(
+ NestedField(4, "lat", DoubleType()),
+ NestedField(5, "long", DoubleType()),
+ )
read_schema = Schema(
NestedField(
3,
"location",
- StructType(
- NestedField(4, "lat", DoubleType()),
- NestedField(5, "long", DoubleType()),
- ),
+ location_struct,
),
NestedField(1, "id", LongType()),
NestedField(6, "preferences", MapType(7, StringType(), 8, StringType())),
@@ -82,11 +92,15 @@ def test_resolver() -> None:
(
(0, DoubleReader()),
(1, DoubleReader()),
- )
+ ),
+ Record,
+ location_struct,
),
),
(2, MapReader(StringReader(), StringReader())),
- )
+ ),
+ Record,
+ read_schema.as_struct(),
)
@@ -223,3 +237,46 @@ def test_resolve_decimal_to_decimal_reduce_precision() -> None:
_ = resolve(DecimalType(19, 25), DecimalType(10, 25)) == DecimalReader(22, 25)
assert "Cannot reduce precision from decimal(19, 25) to decimal(10, 25)" in str(exc_info.value)
+
+
+def test_column_assignment() -> None:
+ int_schema = {
+ "type": "record",
+ "name": "ints",
+ "fields": [
+ {"name": "a", "type": "int", "field-id": 1},
+ {"name": "b", "type": "int", "field-id": 2},
+ {"name": "c", "type": "int", "field-id": 3},
+ ],
+ }
+
+ from fastavro import parse_schema, writer
+
+ parsed_schema = parse_schema(int_schema)
+
+ int_records = [
+ {
+ "a": 1,
+ "b": 2,
+ "c": 3,
+ }
+ ]
+
+ with TemporaryDirectory() as tmpdir:
+ tmp_avro_file = tmpdir + "/manifest.avro"
+ with open(tmp_avro_file, "wb") as out:
+ writer(out, parsed_schema, int_records)
+
+ class Ints(Record):
+ c: int = Field()
+ d: Optional[int] = Field()
+
+ MANIFEST_ENTRY_SCHEMA = Schema(
+ NestedField(3, "c", IntegerType(), required=True),
+ NestedField(4, "d", IntegerType(), required=False),
+ )
+
+ with AvroFile[Ints](PyArrowFileIO().new_input(tmp_avro_file), MANIFEST_ENTRY_SCHEMA, {-1: Ints}) as reader:
+ records = list(reader)
+
+ assert repr(records) == "[Ints[c=3, d=None]]"
diff --git a/python/tests/expressions/test_evaluator.py b/python/tests/expressions/test_evaluator.py
index 95766b0024..b57ede62f2 100644
--- a/python/tests/expressions/test_evaluator.py
+++ b/python/tests/expressions/test_evaluator.py
@@ -14,6 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from typing import Optional
+
from pyiceberg.expressions import (
AlwaysFalse,
AlwaysTrue,
@@ -52,6 +54,20 @@ FLOAT_SCHEMA = Schema(
)
+def _record_simple(id: int, data: Optional[str]) -> Record: # pylint: disable=redefined-builtin
+ r = Record(struct=SIMPLE_SCHEMA.as_struct())
+ r[0] = id
+ r[1] = data
+ return r
+
+
+def _record_float(id: float, f: float) -> Record: # pylint: disable=redefined-builtin
+ r = Record(struct=FLOAT_SCHEMA.as_struct())
+ r[0] = id
+ r[1] = f
+ return r
+
+
def test_true() -> None:
evaluate = expression_evaluator(SIMPLE_SCHEMA, AlwaysTrue(), case_sensitive=True)
assert evaluate(Record(1, "a"))
@@ -131,16 +147,16 @@ def test_not_null() -> None:
def test_is_nan() -> None:
evaluate = expression_evaluator(FLOAT_SCHEMA, IsNaN("f"), case_sensitive=True)
- assert not evaluate(Record(2, 0.0))
- assert not evaluate(Record(3, float("infinity")))
- assert evaluate(Record(4, float("nan")))
+ assert not evaluate(_record_float(2, f=0.0))
+ assert not evaluate(_record_float(3, f=float("infinity")))
+ assert evaluate(_record_float(4, f=float("nan")))
def test_not_nan() -> None:
evaluate = expression_evaluator(FLOAT_SCHEMA, NotNaN("f"), case_sensitive=True)
- assert evaluate(Record(2, 0.0))
- assert evaluate(Record(3, float("infinity")))
- assert not evaluate(Record(4, float("nan")))
+ assert evaluate(_record_float(2, f=0.0))
+ assert evaluate(_record_float(3, f=float("infinity")))
+ assert not evaluate(_record_float(4, f=float("nan")))
def test_not() -> None:
diff --git a/python/tests/expressions/test_expressions.py b/python/tests/expressions/test_expressions.py
index 7b26d45a28..5b82c14744 100644
--- a/python/tests/expressions/test_expressions.py
+++ b/python/tests/expressions/test_expressions.py
@@ -63,6 +63,9 @@ from pyiceberg.expressions.visitors import _from_byte_buffer
from pyiceberg.schema import Accessor, Schema
from pyiceberg.typedef import Record
from pyiceberg.types import (
+ BinaryType,
+ BooleanType,
+ DecimalType,
DoubleType,
FloatType,
IntegerType,
@@ -70,6 +73,8 @@ from pyiceberg.types import (
LongType,
NestedField,
StringType,
+ StructType,
+ UUIDType,
)
from pyiceberg.utils.singleton import Singleton
@@ -605,22 +610,37 @@ def test_invert_always() -> None:
def test_accessor_base_class() -> None:
"""Test retrieving a value at a position of a container using an accessor"""
- struct = Record(*([None] * 12))
+ struct = Record(
+ struct=StructType(
+ NestedField(1, "a", StringType()),
+ NestedField(2, "b", StringType()),
+ NestedField(3, "c", StringType()),
+ NestedField(4, "d", IntegerType()),
+ NestedField(5, "e", IntegerType()),
+ NestedField(6, "f", IntegerType()),
+ NestedField(7, "g", FloatType()),
+ NestedField(8, "h", DecimalType(8, 4)),
+ NestedField(9, "i", UUIDType()),
+ NestedField(10, "j", BooleanType()),
+ NestedField(11, "k", BooleanType()),
+ NestedField(12, "l", BinaryType()),
+ )
+ )
uuid_value = uuid.uuid4()
- struct.set(0, "foo")
- struct.set(1, "bar")
- struct.set(2, "baz")
- struct.set(3, 1)
- struct.set(4, 2)
- struct.set(5, 3)
- struct.set(6, 1.234)
- struct.set(7, Decimal("1.234"))
- struct.set(8, uuid_value)
- struct.set(9, True)
- struct.set(10, False)
- struct.set(11, b"\x19\x04\x9e?")
+ struct[0] = "foo"
+ struct[1] = "bar"
+ struct[2] = "baz"
+ struct[3] = 1
+ struct[4] = 2
+ struct[5] = 3
+ struct[6] = 1.234
+ struct[7] = Decimal("1.234")
+ struct[8] = uuid_value
+ struct[9] = True
+ struct[10] = False
+ struct[11] = b"\x19\x04\x9e?"
assert Accessor(position=0).get(struct) == "foo"
assert Accessor(position=1).get(struct) == "bar"
@@ -902,15 +922,15 @@ def test_less_than_or_equal() -> None:
def test_bound_reference_eval(table_schema_simple: Schema) -> None:
"""Test creating a BoundReference and evaluating it on a StructProtocol"""
- struct = Record(None, None, None, None)
+ struct = Record(struct=table_schema_simple.as_struct())
- struct.set(pos=1, value="foovalue")
- struct.set(pos=2, value=123)
- struct.set(pos=3, value=True)
+ struct[0] = "foovalue"
+ struct[1] = 123
+ struct[2] = True
- position1_accessor = Accessor(position=1)
- position2_accessor = Accessor(position=2)
- position3_accessor = Accessor(position=3)
+ position1_accessor = Accessor(position=0)
+ position2_accessor = Accessor(position=1)
+ position3_accessor = Accessor(position=2)
field1 = table_schema_simple.find_field(1)
field2 = table_schema_simple.find_field(2)
diff --git a/python/tests/expressions/test_visitors.py b/python/tests/expressions/test_visitors.py
index e79c353f81..13e4d01eea 100644
--- a/python/tests/expressions/test_visitors.py
+++ b/python/tests/expressions/test_visitors.py
@@ -786,12 +786,8 @@ def _to_byte_buffer(field_type: IcebergType, val: Any) -> bytes:
def _to_manifest_file(*partitions: PartitionFieldSummary) -> ManifestFile:
- return ManifestFile(
- manifest_path="",
- manifest_length=0,
- partition_spec_id=0,
- partitions=partitions,
- )
+ """Helper to create a ManifestFile"""
+ return ManifestFile(manifest_path="", manifest_length=0, partition_spec_id=0, partitions=partitions)
INT_MIN_VALUE = 30
diff --git a/python/tests/table/test_snapshots.py b/python/tests/table/test_snapshots.py
index 16ba83153e..b119ae9945 100644
--- a/python/tests/table/test_snapshots.py
+++ b/python/tests/table/test_snapshots.py
@@ -17,8 +17,6 @@
# pylint:disable=redefined-outer-name,eval-used
import pytest
-from pyiceberg.io.pyarrow import PyArrowFileIO
-from pyiceberg.manifest import ManifestContent, ManifestFile, PartitionFieldSummary
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
@@ -121,40 +119,3 @@ def test_snapshot_with_properties_repr(snapshot_with_properties: Snapshot) -> No
== """Snapshot(snapshot_id=25, parent_snapshot_id=19, sequence_number=200, timestamp_ms=1602638573590, manifest_list='s3:/a/b/c.avro', summary=Summary(Operation.APPEND, **{'foo': 'bar'}), schema_id=3)"""
)
assert snapshot_with_properties == eval(repr(snapshot_with_properties))
-
-
-def test_fetch_manifest_list(generated_manifest_file_file: str) -> None:
- snapshot = Snapshot(
- snapshot_id=25,
- parent_snapshot_id=19,
- sequence_number=200,
- timestamp_ms=1602638573590,
- manifest_list=generated_manifest_file_file,
- summary=Summary(Operation.APPEND),
- schema_id=3,
- )
- io = PyArrowFileIO()
- actual = snapshot.manifests(io)
- assert actual == [
- ManifestFile(
- manifest_path=actual[0].manifest_path, # Is a temp path that changes every time
- manifest_length=7989,
- partition_spec_id=0,
- content=ManifestContent.DATA,
- sequence_number=0,
- min_sequence_number=0,
- added_snapshot_id=9182715666859759686,
- added_data_files_count=3,
- existing_data_files_count=0,
- deleted_data_files_count=0,
- added_rows_count=237993,
- existing_rows_counts=None,
- deleted_rows_count=0,
- partitions=[
- PartitionFieldSummary(
- contains_null=True, contains_nan=False, lower_bound=b"\x01\x00\x00\x00", upper_bound=b"\x02\x00\x00\x00"
- )
- ],
- key_metadata=None,
- )
- ]
diff --git a/python/tests/test_schema.py b/python/tests/test_schema.py
index a6fba2cff0..4a59631acd 100644
--- a/python/tests/test_schema.py
+++ b/python/tests/test_schema.py
@@ -389,10 +389,10 @@ def test_build_position_accessors_with_struct(table_schema_nested: Schema) -> No
def __init__(self, pos: Dict[int, Any] = EMPTY_DICT):
self._pos: Dict[int, Any] = pos
- def set(self, pos: int, value: Any) -> None:
+ def __setitem__(self, pos: int, value: Any) -> None:
pass
- def get(self, pos: int) -> Any:
+ def __getitem__(self, pos: int) -> Any:
return self._pos[pos]
accessors = build_position_accessors(table_schema_nested)
diff --git a/python/tests/test_transforms.py b/python/tests/test_transforms.py
index 72776b8574..8ba80e396b 100644
--- a/python/tests/test_transforms.py
+++ b/python/tests/test_transforms.py
@@ -62,6 +62,7 @@ from pyiceberg.transforms import (
VoidTransform,
YearTransform,
)
+from pyiceberg.typedef import IcebergBaseModel
from pyiceberg.types import (
BinaryType,
BooleanType,
@@ -87,7 +88,6 @@ from pyiceberg.utils.datetime import (
timestamp_to_micros,
timestamptz_to_micros,
)
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
@pytest.mark.parametrize(
diff --git a/python/tests/test_typedef.py b/python/tests/test_typedef.py
index 628e674443..43388addca 100644
--- a/python/tests/test_typedef.py
+++ b/python/tests/test_typedef.py
@@ -16,7 +16,14 @@
# under the License.
import pytest
+from pyiceberg.schema import Schema
from pyiceberg.typedef import FrozenDict, KeyDefaultDict, Record
+from pyiceberg.types import (
+ IntegerType,
+ NestedField,
+ StringType,
+ StructType,
+)
def test_setitem_frozendict() -> None:
@@ -39,6 +46,44 @@ def test_keydefaultdict() -> None:
assert defaultdict[22] == 1
-def test_record_repr() -> None:
- r = Record(1, "vo", True)
- assert repr(r) == "Record[1, 'vo', True]"
+def test_record_repr(table_schema_simple: Schema) -> None:
+ r = Record("vo", 1, True, struct=table_schema_simple.as_struct())
+ assert repr(r) == "Record[foo='vo', bar=1, baz=True]"
+
+
+def test_named_record() -> None:
+ r = Record(struct=StructType(NestedField(0, "id", IntegerType()), NestedField(1, "name", StringType())))
+
+ with pytest.raises(AttributeError):
+ assert r.id is None # type: ignore
+
+ with pytest.raises(AttributeError):
+ assert r.name is None # type: ignore
+
+ r[0] = 123
+ r[1] = "abc"
+
+ assert r[0] == 123
+ assert r[1] == "abc"
+
+ assert r.id == 123 # type: ignore
+ assert r.name == "abc" # type: ignore
+
+
+def test_record_positional_args() -> None:
+ r = Record(1, "a", True)
+ assert repr(r) == "Record[field1=1, field2='a', field3=True]"
+
+
+def test_record_named_args() -> None:
+ r = Record(foo=1, bar="a", baz=True)
+
+ assert r.foo == 1 # type: ignore
+ assert r.bar == "a" # type: ignore
+ assert r.baz is True # type: ignore
+
+ assert r[0] == 1
+ assert r[1] == "a"
+ assert r[2] is True
+
+ assert repr(r) == "Record[foo=1, bar='a', baz=True]"
diff --git a/python/tests/test_types.py b/python/tests/test_types.py
index cda1309ede..e2c0272b45 100644
--- a/python/tests/test_types.py
+++ b/python/tests/test_types.py
@@ -20,6 +20,7 @@ from typing import Type
import pytest
from pydantic import ValidationError
+from pyiceberg.typedef import IcebergBaseModel
from pyiceberg.types import (
BinaryType,
BooleanType,
@@ -42,7 +43,6 @@ from pyiceberg.types import (
TimeType,
UUIDType,
)
-from pyiceberg.utils.iceberg_base_model import IcebergBaseModel
non_parameterized_types = [
(1, BooleanType),
diff --git a/python/tests/utils/test_manifest.py b/python/tests/utils/test_manifest.py
index e78d0db237..548d94cb8f 100644
--- a/python/tests/utils/test_manifest.py
+++ b/python/tests/utils/test_manifest.py
@@ -19,12 +19,8 @@ from pyiceberg.io import load_file_io
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.manifest import (
DataFile,
- DataFileContent,
FileFormat,
- ManifestContent,
- ManifestEntry,
ManifestEntryStatus,
- ManifestFile,
PartitionFieldSummary,
read_manifest_entry,
read_manifest_list,
@@ -35,513 +31,152 @@ from pyiceberg.table.snapshots import Operation, Summary
def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
input_file = PyArrowFileIO().new_input(location=generated_manifest_entry_file)
+ manifest_entries = list(read_manifest_entry(input_file))
+ manifest_entry = manifest_entries[0]
+
+ assert manifest_entry.status == ManifestEntryStatus.ADDED
+ assert manifest_entry.snapshot_id == 8744736658442914487
+ assert manifest_entry.sequence_number is None
+ assert isinstance(manifest_entry.data_file, DataFile)
+
+ data_file = manifest_entry.data_file
+
+ assert data_file.content is None
assert (
- list(read_manifest_entry(input_file))
- == [
- ManifestEntry(
- status=ManifestEntryStatus.ADDED,
- snapshot_id=8744736658442914487,
- sequence_number=None,
- data_file=DataFile(
- content=DataFileContent.DATA,
- file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
- file_format=FileFormat.PARQUET,
- partition={"VendorID": 1, "tpep_pickup_datetime": 1925},
- record_count=19513,
- file_size_in_bytes=388872,
- block_size_in_bytes=67108864,
- column_sizes={
- 1: 53,
- 2: 98153,
- 3: 98693,
- 4: 53,
- 5: 53,
- 6: 53,
- 7: 17425,
- 8: 18528,
- 9: 53,
- 10: 44788,
- 11: 35571,
- 12: 53,
- 13: 1243,
- 14: 2355,
- 15: 12750,
- 16: 4029,
- 17: 110,
- 18: 47194,
- 19: 2948,
- },
- value_counts={
- 1: 19513,
- 2: 19513,
- 3: 19513,
- 4: 19513,
- 5: 19513,
- 6: 19513,
- 7: 19513,
- 8: 19513,
- 9: 19513,
- 10: 19513,
- 11: 19513,
- 12: 19513,
- 13: 19513,
- 14: 19513,
- 15: 19513,
- 16: 19513,
- 17: 19513,
- 18: 19513,
- 19: 19513,
- },
- null_value_counts={
- 1: 19513,
- 2: 0,
- 3: 0,
- 4: 19513,
- 5: 19513,
- 6: 19513,
- 7: 0,
- 8: 0,
- 9: 19513,
- 10: 0,
- 11: 0,
- 12: 19513,
- 13: 0,
- 14: 0,
- 15: 0,
- 16: 0,
- 17: 0,
- 18: 0,
- 19: 0,
- },
- nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
- distinct_counts=None,
- lower_bounds={
- 2: b"2020-04-01 00:00",
- 3: b"2020-04-01 00:12",
- 7: b"\x03\x00\x00\x00",
- 8: b"\x01\x00\x00\x00",
- 10: b"\xf6(\\\x8f\xc2\x05S\xc0",
- 11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
- 15: b")\\\x8f\xc2\xf5(\x08\xc0",
- 16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
- 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
- },
- upper_bounds={
- 2: b"2020-04-30 23:5:",
- 3: b"2020-05-01 00:41",
- 7: b"\t\x01\x00\x00",
- 8: b"\t\x01\x00\x00",
- 10: b"\xcd\xcc\xcc\xcc\xcc,_@",
- 11: b"\x1f\x85\xebQ\\\xe2\xfe@",
- 13: b"\x00\x00\x00\x00\x00\x00\x12@",
- 14: b"\x00\x00\x00\x00\x00\x00\xe0?",
- 15: b"q=\n\xd7\xa3\xf01@",
- 16: b"\x00\x00\x00\x00\x00`B@",
- 17: b"333333\xd3?",
- 18: b"\x00\x00\x00\x00\x00\x18b@",
- 19: b"\x00\x00\x00\x00\x00\x00\x04@",
- },
- key_metadata=None,
- split_offsets=[4],
- equality_ids=None,
- sort_order_id=0,
- ),
- ),
- ManifestEntry(
- status=ManifestEntryStatus.ADDED,
- snapshot_id=8744736658442914487,
- sequence_number=None,
- data_file=DataFile(
- content=DataFileContent.DATA,
- file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet",
- file_format=FileFormat.PARQUET,
- partition={"VendorID": 1, "tpep_pickup_datetime": 1925},
- record_count=95050,
- file_size_in_bytes=1265950,
- block_size_in_bytes=67108864,
- column_sizes={
- 1: 318,
- 2: 329806,
- 3: 331632,
- 4: 15343,
- 5: 2351,
- 6: 3389,
- 7: 71269,
- 8: 76429,
- 9: 16383,
- 10: 86992,
- 11: 89608,
- 12: 265,
- 13: 19377,
- 14: 1692,
- 15: 76162,
- 16: 4354,
- 17: 759,
- 18: 120650,
- 19: 11804,
- },
- value_counts={
- 1: 95050,
- 2: 95050,
- 3: 95050,
- 4: 95050,
- 5: 95050,
- 6: 95050,
- 7: 95050,
- 8: 95050,
- 9: 95050,
- 10: 95050,
- 11: 95050,
- 12: 95050,
- 13: 95050,
- 14: 95050,
- 15: 95050,
- 16: 95050,
- 17: 95050,
- 18: 95050,
- 19: 95050,
- },
- null_value_counts={
- 1: 0,
- 2: 0,
- 3: 0,
- 4: 0,
- 5: 0,
- 6: 0,
- 7: 0,
- 8: 0,
- 9: 0,
- 10: 0,
- 11: 0,
- 12: 95050,
- 13: 0,
- 14: 0,
- 15: 0,
- 16: 0,
- 17: 0,
- 18: 0,
- 19: 0,
- },
- nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
- distinct_counts=None,
- lower_bounds={
- 1: b"\x01\x00\x00\x00",
- 2: b"2020-04-01 00:00",
- 3: b"2020-04-01 00:03",
- 4: b"\x00\x00\x00\x00",
- 5: b"\x01\x00\x00\x00",
- 6: b"N",
- 7: b"\x01\x00\x00\x00",
- 8: b"\x01\x00\x00\x00",
- 9: b"\x01\x00\x00\x00",
- 10: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 14: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 15: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 18: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 19: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- },
- upper_bounds={
- 1: b"\x01\x00\x00\x00",
- 2: b"2020-04-30 23:5:",
- 3: b"2020-05-01 00:1:",
- 4: b"\x06\x00\x00\x00",
- 5: b"c\x00\x00\x00",
- 6: b"Y",
- 7: b"\t\x01\x00\x00",
- 8: b"\t\x01\x00\x00",
- 9: b"\x04\x00\x00\x00",
- 10: b"\\\x8f\xc2\xf5(8\x8c@",
- 11: b"\xcd\xcc\xcc\xcc\xcc,f@",
- 13: b"\x00\x00\x00\x00\x00\x00\x1c@",
- 14: b"\x9a\x99\x99\x99\x99\x99\xf1?",
- 15: b"\x00\x00\x00\x00\x00\x00Y@",
- 16: b"\x00\x00\x00\x00\x00\xb0X@",
- 17: b"333333\xd3?",
- 18: b"\xc3\xf5(\\\x8f:\x8c@",
- 19: b"\x00\x00\x00\x00\x00\x00\x04@",
- },
- key_metadata=None,
- split_offsets=[4],
- equality_ids=None,
- sort_order_id=0,
- ),
- ),
- ]
- != [
- ManifestEntry(
- status=ManifestEntryStatus.ADDED,
- snapshot_id=8744736658442914487,
- sequence_number=None,
- data_file=DataFile(
- content=DataFileContent.DATA,
- file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
- file_format=FileFormat.PARQUET,
- partition={"VendorID": 1, "tpep_pickup_datetime": None},
- record_count=19513,
- file_size_in_bytes=388872,
- block_size_in_bytes=67108864,
- column_sizes={
- 1: 53,
- 2: 98153,
- 3: 98693,
- 4: 53,
- 5: 53,
- 6: 53,
- 7: 17425,
- 8: 18528,
- 9: 53,
- 10: 44788,
- 11: 35571,
- 12: 53,
- 13: 1243,
- 14: 2355,
- 15: 12750,
- 16: 4029,
- 17: 110,
- 18: 47194,
- 19: 2948,
- },
- value_counts={
- 1: 19513,
- 2: 19513,
- 3: 19513,
- 4: 19513,
- 5: 19513,
- 6: 19513,
- 7: 19513,
- 8: 19513,
- 9: 19513,
- 10: 19513,
- 11: 19513,
- 12: 19513,
- 13: 19513,
- 14: 19513,
- 15: 19513,
- 16: 19513,
- 17: 19513,
- 18: 19513,
- 19: 19513,
- },
- null_value_counts={
- 1: 19513,
- 2: 0,
- 3: 0,
- 4: 19513,
- 5: 19513,
- 6: 19513,
- 7: 0,
- 8: 0,
- 9: 19513,
- 10: 0,
- 11: 0,
- 12: 19513,
- 13: 0,
- 14: 0,
- 15: 0,
- 16: 0,
- 17: 0,
- 18: 0,
- 19: 0,
- },
- nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
- distinct_counts=None,
- lower_bounds={
- 2: b"2020-04-01 00:00",
- 3: b"2020-04-01 00:12",
- 7: b"\x03\x00\x00\x00",
- 8: b"\x01\x00\x00\x00",
- 10: b"\xf6(\\\x8f\xc2\x05S\xc0",
- 11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
- 15: b")\\\x8f\xc2\xf5(\x08\xc0",
- 16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
- 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
- },
- upper_bounds={
- 2: b"2020-04-30 23:5:",
- 3: b"2020-05-01 00:41",
- 7: b"\t\x01\x00\x00",
- 8: b"\t\x01\x00\x00",
- 10: b"\xcd\xcc\xcc\xcc\xcc,_@",
- 11: b"\x1f\x85\xebQ\\\xe2\xfe@",
- 13: b"\x00\x00\x00\x00\x00\x00\x12@",
- 14: b"\x00\x00\x00\x00\x00\x00\xe0?",
- 15: b"q=\n\xd7\xa3\xf01@",
- 16: b"\x00\x00\x00\x00\x00`B@",
- 17: b"333333\xd3?",
- 18: b"\x00\x00\x00\x00\x00\x18b@",
- 19: b"\x00\x00\x00\x00\x00\x00\x04@",
- },
- key_metadata=None,
- split_offsets=[4],
- equality_ids=None,
- sort_order_id=0,
- ),
- ),
- ManifestEntry(
- status=ManifestEntryStatus.ADDED,
- snapshot_id=8744736658442914487,
- sequence_number=None,
- data_file=DataFile(
- content=DataFileContent.DATA,
- file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet",
- file_format=FileFormat.PARQUET,
- partition={"VendorID": 1, "tpep_pickup_datetime": 1925},
- record_count=95050,
- file_size_in_bytes=1265950,
- block_size_in_bytes=67108864,
- column_sizes={
- 1: 318,
- 2: 329806,
- 3: 331632,
- 4: 15343,
- 5: 2351,
- 6: 3389,
- 7: 71269,
- 8: 76429,
- 9: 16383,
- 10: 86992,
- 11: 89608,
- 12: 265,
- 13: 19377,
- 14: 1692,
- 15: 76162,
- 16: 4354,
- 17: 759,
- 18: 120650,
- 19: 11804,
- },
- value_counts={
- 1: 95050,
- 2: 95050,
- 3: 95050,
- 4: 95050,
- 5: 95050,
- 6: 95050,
- 7: 95050,
- 8: 95050,
- 9: 95050,
- 10: 95050,
- 11: 95050,
- 12: 95050,
- 13: 95050,
- 14: 95050,
- 15: 95050,
- 16: 95050,
- 17: 95050,
- 18: 95050,
- 19: 95050,
- },
- null_value_counts={
- 1: 0,
- 2: 0,
- 3: 0,
- 4: 0,
- 5: 0,
- 6: 0,
- 7: 0,
- 8: 0,
- 9: 0,
- 10: 0,
- 11: 0,
- 12: 95050,
- 13: 0,
- 14: 0,
- 15: 0,
- 16: 0,
- 17: 0,
- 18: 0,
- 19: 0,
- },
- nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
- distinct_counts=None,
- lower_bounds={
- 1: b"\x01\x00\x00\x00",
- 2: b"2020-04-01 00:00",
- 3: b"2020-04-01 00:03",
- 4: b"\x00\x00\x00\x00",
- 5: b"\x01\x00\x00\x00",
- 6: b"N",
- 7: b"\x01\x00\x00\x00",
- 8: b"\x01\x00\x00\x00",
- 9: b"\x01\x00\x00\x00",
- 10: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 14: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 15: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 18: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 19: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- },
- upper_bounds={
- 1: b"\x01\x00\x00\x00",
- 2: b"2020-04-30 23:5:",
- 3: b"2020-05-01 00:1:",
- 4: b"\x06\x00\x00\x00",
- 5: b"c\x00\x00\x00",
- 6: b"Y",
- 7: b"\t\x01\x00\x00",
- 8: b"\t\x01\x00\x00",
- 9: b"\x04\x00\x00\x00",
- 10: b"\\\x8f\xc2\xf5(8\x8c@",
- 11: b"\xcd\xcc\xcc\xcc\xcc,f@",
- 13: b"\x00\x00\x00\x00\x00\x00\x1c@",
- 14: b"\x9a\x99\x99\x99\x99\x99\xf1?",
- 15: b"\x00\x00\x00\x00\x00\x00Y@",
- 16: b"\x00\x00\x00\x00\x00\xb0X@",
- 17: b"333333\xd3?",
- 18: b"\xc3\xf5(\\\x8f:\x8c@",
- 19: b"\x00\x00\x00\x00\x00\x00\x04@",
- },
- key_metadata=None,
- split_offsets=[4],
- equality_ids=None,
- sort_order_id=0,
- ),
- ),
- ]
+ data_file.file_path
+ == "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
)
+ assert data_file.file_format == FileFormat.PARQUET
+ assert repr(data_file.partition) == "Record[VendorID=1, tpep_pickup_datetime=1925]"
+ assert data_file.record_count == 19513
+ assert data_file.file_size_in_bytes == 388872
+ assert data_file.column_sizes == {
+ 1: 53,
+ 2: 98153,
+ 3: 98693,
+ 4: 53,
+ 5: 53,
+ 6: 53,
+ 7: 17425,
+ 8: 18528,
+ 9: 53,
+ 10: 44788,
+ 11: 35571,
+ 12: 53,
+ 13: 1243,
+ 14: 2355,
+ 15: 12750,
+ 16: 4029,
+ 17: 110,
+ 18: 47194,
+ 19: 2948,
+ }
+ assert data_file.value_counts == {
+ 1: 19513,
+ 2: 19513,
+ 3: 19513,
+ 4: 19513,
+ 5: 19513,
+ 6: 19513,
+ 7: 19513,
+ 8: 19513,
+ 9: 19513,
+ 10: 19513,
+ 11: 19513,
+ 12: 19513,
+ 13: 19513,
+ 14: 19513,
+ 15: 19513,
+ 16: 19513,
+ 17: 19513,
+ 18: 19513,
+ 19: 19513,
+ }
+ assert data_file.null_value_counts == {
+ 1: 19513,
+ 2: 0,
+ 3: 0,
+ 4: 19513,
+ 5: 19513,
+ 6: 19513,
+ 7: 0,
+ 8: 0,
+ 9: 19513,
+ 10: 0,
+ 11: 0,
+ 12: 19513,
+ 13: 0,
+ 14: 0,
+ 15: 0,
+ 16: 0,
+ 17: 0,
+ 18: 0,
+ 19: 0,
+ }
+ assert data_file.nan_value_counts == {16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0}
+ assert data_file.lower_bounds == {
+ 2: b"2020-04-01 00:00",
+ 3: b"2020-04-01 00:12",
+ 7: b"\x03\x00\x00\x00",
+ 8: b"\x01\x00\x00\x00",
+ 10: b"\xf6(\\\x8f\xc2\x05S\xc0",
+ 11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+ 13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+ 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
+ 15: b")\\\x8f\xc2\xf5(\x08\xc0",
+ 16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+ 17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
+ 18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
+ 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
+ }
+ assert data_file.upper_bounds == {
+ 2: b"2020-04-30 23:5:",
+ 3: b"2020-05-01 00:41",
+ 7: b"\t\x01\x00\x00",
+ 8: b"\t\x01\x00\x00",
+ 10: b"\xcd\xcc\xcc\xcc\xcc,_@",
+ 11: b"\x1f\x85\xebQ\\\xe2\xfe@",
+ 13: b"\x00\x00\x00\x00\x00\x00\x12@",
+ 14: b"\x00\x00\x00\x00\x00\x00\xe0?",
+ 15: b"q=\n\xd7\xa3\xf01@",
+ 16: b"\x00\x00\x00\x00\x00`B@",
+ 17: b"333333\xd3?",
+ 18: b"\x00\x00\x00\x00\x00\x18b@",
+ 19: b"\x00\x00\x00\x00\x00\x00\x04@",
+ }
+ assert data_file.key_metadata is None
+ assert data_file.split_offsets == [4]
+ assert data_file.equality_ids is None
+ assert data_file.sort_order_id == 0
def test_read_manifest_list(generated_manifest_file_file: str) -> None:
input_file = PyArrowFileIO().new_input(generated_manifest_file_file)
- actual = list(read_manifest_list(input_file))
- expected = [
- ManifestFile(
- manifest_path=actual[0].manifest_path,
- manifest_length=7989,
- partition_spec_id=0,
- added_snapshot_id=9182715666859759686,
- added_data_files_count=3,
- existing_data_files_count=0,
- deleted_data_files_count=0,
- partitions=[
- PartitionFieldSummary(
- contains_null=True, contains_nan=False, lower_bound=b"\x01\x00\x00\x00", upper_bound=b"\x02\x00\x00\x00"
- )
- ],
- added_rows_count=237993,
- existing_rows_counts=None,
- deleted_rows_count=0,
- )
- ]
- assert actual == expected
+ manifest_list = list(read_manifest_list(input_file))[0]
+
+ assert manifest_list.manifest_length == 7989
+ assert manifest_list.partition_spec_id == 0
+ assert manifest_list.added_snapshot_id == 9182715666859759686
+ assert manifest_list.added_files_count == 3
+ assert manifest_list.existing_files_count == 0
+ assert manifest_list.deleted_files_count == 0
+
+ assert isinstance(manifest_list.partitions, list)
+
+ partitions_summary = manifest_list.partitions[0]
+ assert isinstance(partitions_summary, PartitionFieldSummary)
+ assert partitions_summary.contains_null is True
+ assert partitions_summary.contains_nan is False
+ assert partitions_summary.lower_bound == b"\x01\x00\x00\x00"
+ assert partitions_summary.upper_bound == b"\x02\x00\x00\x00"
-def test_read_manifest(generated_manifest_file_file: str, generated_manifest_entry_file: str) -> None:
+ assert manifest_list.added_rows_count == 237993
+ assert manifest_list.existing_rows_count == 0
+ assert manifest_list.deleted_rows_count == 0
+
+
+def test_read_manifest(generated_manifest_file_file: str) -> None:
io = load_file_io({})
snapshot = Snapshot(
@@ -552,269 +187,29 @@ def test_read_manifest(generated_manifest_file_file: str, generated_manifest_ent
summary=Summary(Operation.APPEND),
schema_id=3,
)
- manifest_list = snapshot.manifests(io)
+ manifest_list = snapshot.manifests(io)[0]
+
+ assert manifest_list.manifest_length == 7989
+ assert manifest_list.partition_spec_id == 0
+ assert manifest_list.content is None
+ assert manifest_list.sequence_number is None
+ assert manifest_list.min_sequence_number is None
+ assert manifest_list.added_snapshot_id == 9182715666859759686
+ assert manifest_list.added_files_count == 3
+ assert manifest_list.existing_files_count == 0
+ assert manifest_list.deleted_files_count == 0
+ assert manifest_list.added_rows_count == 237993
+ assert manifest_list.existing_rows_count == 0
+ assert manifest_list.deleted_rows_count == 0
+ assert manifest_list.key_metadata is None
+
+ assert isinstance(manifest_list.partitions, list)
+
+ partition = manifest_list.partitions[0]
- assert manifest_list == [
- ManifestFile(
- manifest_path=generated_manifest_entry_file,
- manifest_length=7989,
- partition_spec_id=0,
- content=ManifestContent.DATA,
- sequence_number=0,
- min_sequence_number=0,
- added_snapshot_id=9182715666859759686,
- added_data_files_count=3,
- existing_data_files_count=0,
- deleted_data_files_count=0,
- added_rows_count=237993,
- existing_rows_counts=None,
- deleted_rows_count=0,
- partitions=[
- PartitionFieldSummary(
- contains_null=True, contains_nan=False, lower_bound=b"\x01\x00\x00\x00", upper_bound=b"\x02\x00\x00\x00"
- )
- ],
- key_metadata=None,
- )
- ]
- actual = manifest_list[0].fetch_manifest_entry(io)
- expected = [
- ManifestEntry(
- status=ManifestEntryStatus.ADDED,
- snapshot_id=8744736658442914487,
- sequence_number=None,
- data_file=DataFile(
- content=DataFileContent.DATA,
- file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
- file_format=FileFormat.PARQUET,
- partition={"VendorID": 1, "tpep_pickup_datetime": 1925},
- record_count=19513,
- file_size_in_bytes=388872,
- block_size_in_bytes=67108864,
- column_sizes={
- 1: 53,
- 2: 98153,
- 3: 98693,
- 4: 53,
- 5: 53,
- 6: 53,
- 7: 17425,
- 8: 18528,
- 9: 53,
- 10: 44788,
- 11: 35571,
- 12: 53,
- 13: 1243,
- 14: 2355,
- 15: 12750,
- 16: 4029,
- 17: 110,
- 18: 47194,
- 19: 2948,
- },
- value_counts={
- 1: 19513,
- 2: 19513,
- 3: 19513,
- 4: 19513,
- 5: 19513,
- 6: 19513,
- 7: 19513,
- 8: 19513,
- 9: 19513,
- 10: 19513,
- 11: 19513,
- 12: 19513,
- 13: 19513,
- 14: 19513,
- 15: 19513,
- 16: 19513,
- 17: 19513,
- 18: 19513,
- 19: 19513,
- },
- null_value_counts={
- 1: 19513,
- 2: 0,
- 3: 0,
- 4: 19513,
- 5: 19513,
- 6: 19513,
- 7: 0,
- 8: 0,
- 9: 19513,
- 10: 0,
- 11: 0,
- 12: 19513,
- 13: 0,
- 14: 0,
- 15: 0,
- 16: 0,
- 17: 0,
- 18: 0,
- 19: 0,
- },
- nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
- distinct_counts=None,
- lower_bounds={
- 2: b"2020-04-01 00:00",
- 3: b"2020-04-01 00:12",
- 7: b"\x03\x00\x00\x00",
- 8: b"\x01\x00\x00\x00",
- 10: b"\xf6(\\\x8f\xc2\x05S\xc0",
- 11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 14: b"\x00\x00\x00\x00\x00\x00\xe0\xbf",
- 15: b")\\\x8f\xc2\xf5(\x08\xc0",
- 16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 18: b"\xf6(\\\x8f\xc2\xc5S\xc0",
- 19: b"\x00\x00\x00\x00\x00\x00\x04\xc0",
- },
- upper_bounds={
- 2: b"2020-04-30 23:5:",
- 3: b"2020-05-01 00:41",
- 7: b"\t\x01\x00\x00",
- 8: b"\t\x01\x00\x00",
- 10: b"\xcd\xcc\xcc\xcc\xcc,_@",
- 11: b"\x1f\x85\xebQ\\\xe2\xfe@",
- 13: b"\x00\x00\x00\x00\x00\x00\x12@",
- 14: b"\x00\x00\x00\x00\x00\x00\xe0?",
- 15: b"q=\n\xd7\xa3\xf01@",
- 16: b"\x00\x00\x00\x00\x00`B@",
- 17: b"333333\xd3?",
- 18: b"\x00\x00\x00\x00\x00\x18b@",
- 19: b"\x00\x00\x00\x00\x00\x00\x04@",
- },
- key_metadata=None,
- split_offsets=[4],
- equality_ids=None,
- sort_order_id=0,
- ),
- ),
- ManifestEntry(
- status=ManifestEntryStatus.ADDED,
- snapshot_id=8744736658442914487,
- sequence_number=None,
- data_file=DataFile(
- content=DataFileContent.DATA,
- file_path="/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet",
- file_format=FileFormat.PARQUET,
- partition={"VendorID": 1, "tpep_pickup_datetime": 1925},
- record_count=95050,
- file_size_in_bytes=1265950,
- block_size_in_bytes=67108864,
- column_sizes={
- 1: 318,
- 2: 329806,
- 3: 331632,
- 4: 15343,
- 5: 2351,
- 6: 3389,
- 7: 71269,
- 8: 76429,
- 9: 16383,
- 10: 86992,
- 11: 89608,
- 12: 265,
- 13: 19377,
- 14: 1692,
- 15: 76162,
- 16: 4354,
- 17: 759,
- 18: 120650,
- 19: 11804,
- },
- value_counts={
- 1: 95050,
- 2: 95050,
- 3: 95050,
- 4: 95050,
- 5: 95050,
- 6: 95050,
- 7: 95050,
- 8: 95050,
- 9: 95050,
- 10: 95050,
- 11: 95050,
- 12: 95050,
- 13: 95050,
- 14: 95050,
- 15: 95050,
- 16: 95050,
- 17: 95050,
- 18: 95050,
- 19: 95050,
- },
- null_value_counts={
- 1: 0,
- 2: 0,
- 3: 0,
- 4: 0,
- 5: 0,
- 6: 0,
- 7: 0,
- 8: 0,
- 9: 0,
- 10: 0,
- 11: 0,
- 12: 95050,
- 13: 0,
- 14: 0,
- 15: 0,
- 16: 0,
- 17: 0,
- 18: 0,
- 19: 0,
- },
- nan_value_counts={16: 0, 17: 0, 18: 0, 19: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0},
- distinct_counts=None,
- lower_bounds={
- 1: b"\x01\x00\x00\x00",
- 2: b"2020-04-01 00:00",
- 3: b"2020-04-01 00:03",
- 4: b"\x00\x00\x00\x00",
- 5: b"\x01\x00\x00\x00",
- 6: b"N",
- 7: b"\x01\x00\x00\x00",
- 8: b"\x01\x00\x00\x00",
- 9: b"\x01\x00\x00\x00",
- 10: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 11: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 13: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 14: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 15: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 16: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 17: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 18: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- 19: b"\x00\x00\x00\x00\x00\x00\x00\x00",
- },
- upper_bounds={
- 1: b"\x01\x00\x00\x00",
- 2: b"2020-04-30 23:5:",
- 3: b"2020-05-01 00:1:",
- 4: b"\x06\x00\x00\x00",
- 5: b"c\x00\x00\x00",
- 6: b"Y",
- 7: b"\t\x01\x00\x00",
- 8: b"\t\x01\x00\x00",
- 9: b"\x04\x00\x00\x00",
- 10: b"\\\x8f\xc2\xf5(8\x8c@",
- 11: b"\xcd\xcc\xcc\xcc\xcc,f@",
- 13: b"\x00\x00\x00\x00\x00\x00\x1c@",
- 14: b"\x9a\x99\x99\x99\x99\x99\xf1?",
- 15: b"\x00\x00\x00\x00\x00\x00Y@",
- 16: b"\x00\x00\x00\x00\x00\xb0X@",
- 17: b"333333\xd3?",
- 18: b"\xc3\xf5(\\\x8f:\x8c@",
- 19: b"\x00\x00\x00\x00\x00\x00\x04@",
- },
- key_metadata=None,
- split_offsets=[4],
- equality_ids=None,
- sort_order_id=0,
- ),
- ),
- ]
+ assert isinstance(partition, PartitionFieldSummary)
- assert actual == expected
+ assert partition.contains_null is True
+ assert partition.contains_nan is False
+ assert partition.lower_bound == b"\x01\x00\x00\x00"
+ assert partition.upper_bound == b"\x02\x00\x00\x00"