You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/03 14:03:17 UTC

[GitHub] [iceberg] Fokko commented on a diff in pull request #3677: Python: Adding TableMetadata object from dict, bytestream, and InputFile implementation

Fokko commented on code in PR #3677:
URL: https://github.com/apache/iceberg/pull/3677#discussion_r863809856


##########
python/src/iceberg/table/metadata.py:
##########
@@ -0,0 +1,366 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import codecs
+import json
+
+import attr
+from jsonschema import validate as validate_json
+from jsonschema.exceptions import ValidationError
+
+from iceberg.io.base import InputFile, OutputFile
+
+TABLE_METADATA_V1_SCHEMA = {
+    "$schema": "http://json-schema.org/draft-07/schema",
+    "type": "object",
+    "required": [
+        "format-version",
+        "location",
+        "last-updated-ms",
+        "last-column-id",
+        "schema",
+        "partition-spec",
+    ],
+    "properties": {
+        "format-version": {"type": "number"},
+        "table-uuid": {"type": "string"},
+        "location": {"type": "string"},
+        "last-updated-ms": {"type": "number"},
+        "last-column-id": {"type": "number"},
+        "schema": {
+            "type": "object",
+        },
+        "schemas": {
+            "type": "array",
+        },
+        "current-schema-id": {"type": "number"},
+        "partition-spec": {
+            "type": "array",
+        },
+        "partition-specs": {
+            "type": "array",
+        },
+        "default-spec-id": {"type": "number"},
+        "last-partition-id": {"type": "number"},
+        "properties": {
+            "type": "object",
+        },
+        "current-snapshot-id": {"type": "number"},
+        "snapshots": {
+            "type": "array",
+        },
+        "snapshot-log": {
+            "type": "array",
+        },
+        "metadata-log": {
+            "type": "array",
+        },
+        "sort-orders": {
+            "type": "array",
+        },
+        "default-sort-order-id": {"type": "number"},
+    },
+    "additionalProperties": False,
+}
+
+TABLE_METADATA_V2_SCHEMA = {
+    "$schema": "http://json-schema.org/draft-07/schema",
+    "type": "object",
+    "required": [
+        "format-version",
+        "table-uuid",
+        "location",
+        "last-sequence-number",
+        "last-updated-ms",
+        "last-column-id",
+        "schemas",
+        "current-schema-id",
+        "partition-specs",
+        "default-spec-id",
+        "last-partition-id",
+        "default-sort-order-id",
+        "sort-orders",
+    ],
+    "properties": {
+        "format-version": {"type": "number"},
+        "table-uuid": {"type": "string"},
+        "location": {"type": "string"},
+        "last-sequence-number": {"type": "number"},
+        "last-updated-ms": {"type": "number"},
+        "last-column-id": {"type": "number"},
+        "schemas": {
+            "type": "array",
+        },
+        "current-schema-id": {"type": "number"},
+        "partition-specs": {
+            "type": "array",
+        },
+        "default-spec-id": {"type": "number"},
+        "last-partition-id": {"type": "number"},
+        "properties": {
+            "type": "object",
+        },
+        "current-snapshot-id": {"type": "number"},
+        "snapshots": {
+            "type": "array",
+        },
+        "snapshot-log": {
+            "type": "array",
+        },
+        "metadata-log": {
+            "type": "array",
+        },
+        "sort-orders": {
+            "type": "array",
+        },
+        "default-sort-order-id": {"type": "number"},
+    },
+    "additionalProperties": False,
+}
+
+
+@attr.s(frozen=True, auto_attribs=True)
+class TableMetadata:
+    """Metadata for an Iceberg table as specified in the Apache Iceberg
+    spec (https://iceberg.apache.org/spec/#iceberg-table-spec)"""
+
+    format_version: int
+    """An integer version number for the format. Currently, this can be 1 or 2
+    based on the spec. Implementations must throw an exception if a table’s
+    version is higher than the supported version."""
+
+    table_uuid: str
+    """A UUID that identifies the table, generated when the table is created. 
+    Implementations must throw an exception if a table’s UUID does not match 
+    the expected UUID after refreshing metadata."""
+
+    location: str
+    """The table’s base location. This is used by writers to determine where 
+    to store data files, manifest files, and table metadata files."""
+
+    last_sequence_number: int
+    """The table’s highest assigned sequence number, a monotonically
+    increasing long that tracks the order of snapshots in a table."""
+
+    last_updated_ms: int
+    """Timestamp in milliseconds from the unix epoch when the table
+    was last updated. Each table metadata file should update this
+    field just before writing."""
+
+    last_column_id: int
+    """An integer; the highest assigned column ID for the table. 
+    This is used to ensure columns are always assigned an unused ID
+    when evolving schemas."""
+
+    schema: dict
+    """The table’s current schema. (Deprecated: use schemas and 
+    current-schema-id instead)"""
+
+    schemas: list
+    """A list of schemas, stored as objects with schema-id."""
+
+    current_schema_id: int
+    """ID of the table’s current schema."""
+
+    partition_spec: dict
+    """The table’s current partition spec, stored as only fields. 
+    Note that this is used by writers to partition data, but is 
+    not used when reading because reads use the specs stored in 
+    manifest files. (Deprecated: use partition-specs and default-spec-id 
+    instead)"""
+
+    partition_specs: list
+    """A list of partition specs, stored as full partition spec objects."""
+
+    default_spec_id: int
+    """ID of the “current” spec that writers should use by default."""
+
+    last_partition_id: int
+    """An integer; the highest assigned partition field ID across all 
+    partition specs for the table. This is used to ensure partition fields 
+    are always assigned an unused ID when evolving specs."""
+
+    properties: dict
+    """	A string to string map of table properties. This is used to 
+    control settings that affect reading and writing and is not intended 
+    to be used for arbitrary metadata. For example, commit.retry.num-retries 
+    is used to control the number of commit retries."""
+
+    current_snapshot_id: int
+    """ID of the current table snapshot."""
+
+    snapshots: list
+    """A list of valid snapshots. Valid snapshots are snapshots for which 
+    all data files exist in the file system. A data file must not be 
+    deleted from the file system until the last snapshot in which it was 
+    listed is garbage collected."""
+
+    snapshot_log: list
+    """A list (optional) of timestamp and snapshot ID pairs that encodes 
+    changes to the current snapshot for the table. Each time the 
+    current-snapshot-id is changed, a new entry should be added with the 
+    last-updated-ms and the new current-snapshot-id. When snapshots are 
+    expired from the list of valid snapshots, all entries before a snapshot 
+    that has expired should be removed."""
+
+    metadata_log: list
+    """A list (optional) of timestamp and metadata file location pairs that 
+    encodes changes to the previous metadata files for the table. Each time 
+    a new metadata file is created, a new entry of the previous metadata 
+    file location should be added to the list. Tables can be configured to 
+    remove oldest metadata log entries and keep a fixed-size log of the most 
+    recent entries after a commit."""
+
+    sort_orders: list
+    """A list of sort orders, stored as full sort order objects."""
+
+    default_sort_order_id: int
+    """Default sort order id of the table. Note that this could be used by 
+    writers, but is not used when reading because reads use the specs stored
+     in manifest files."""
+
+    def validate(self) -> None:
+        """Checks that the table metadata object is valid. The validation schema
+        used depends on the Iceberg table metadata version."""
+        if self.format_version == 1:
+            self.validate_v1(self.to_dict())
+        elif self.format_version == 2:
+            self.validate_v2(self.to_dict())
+        else:
+            raise ValueError(f"Unknown table metadata version {self.format_version}")
+
+    @staticmethod
+    def validate_v1(metadata: dict) -> None:
+        """Perform a JSONSchema validation using the v1 Iceberg table metadata schema"""
+        try:
+            validate_json(instance=metadata, schema=TABLE_METADATA_V1_SCHEMA)
+        except ValidationError as e:
+            # TODO Log something here
+            raise (e)
+
+    @staticmethod
+    def validate_v2(metadata: dict) -> None:
+        """Perform a JSONSchema validation using the v2 Iceberg table metadata schema"""
+        try:
+            validate_json(instance=metadata, schema=TABLE_METADATA_V2_SCHEMA)
+        except ValidationError as e:
+            # TODO Log something here
+            raise (e)
+
+    @classmethod
+    def from_byte_stream(cls, byte_stream, encoding: str = "utf-8") -> "TableMetadata":
+        """Instantiate a TableMetadata object from a byte stream
+
+        Args:
+            byte_stream: A file-like byte stream object
+            encoding (default "utf-8"): The byte encoder to use for the reader
+        """
+        reader = codecs.getreader(encoding)
+        metadata = json.load(reader(byte_stream))
+        return cls.from_dict(metadata)
+
+    @classmethod
+    def from_input_file(cls, input_file: InputFile, encoding: str = "utf-8") -> "TableMetadata":
+        """Create a TableMetadata instance from an input file
+
+        Args:
+            input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract
+            base class
+            encoding (str): Encoding to use when loading bytestream
+
+        Returns:
+            TableMetadata: A table metadata instance
+
+        """
+        return cls.from_byte_stream(byte_stream=input_file.open(), encoding=encoding)
+
+    def to_output_file(self, output_file: OutputFile, overwrite: bool = False) -> None:
+        """Write a TableMetadata instance to an output file
+
+        Args:
+            output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract
+            base class
+            overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
+        """
+        f = output_file.create(overwrite=overwrite)
+        f.write(json.dumps(self.to_dict()).encode("utf-8"))
+
+    @classmethod
+    def from_dict(cls, d: dict) -> "TableMetadata":
+        """Instantiates a TableMetadata object using a dictionary
+
+        Args:
+            d: A dictionary object that conforms to table metadata specification
+        Returns:
+            TableMetadata: A table metadata instance
+        """
+        return cls(  # type: ignore
+            format_version=d.get("format-version"),  # type: ignore
+            table_uuid=d.get("table-uuid"),  # type: ignore
+            location=d.get("location"),  # type: ignore
+            last_sequence_number=d.get("last-sequence-number"),  # type: ignore
+            last_updated_ms=d.get("last-updated-ms"),  # type: ignore
+            last_column_id=d.get("last-column-id"),  # type: ignore
+            schema=d.get("schema") or {},  # type: ignore
+            schemas=d.get("schemas") or [],  # type: ignore
+            current_schema_id=d.get("current-schema-id"),  # type: ignore
+            partition_spec=d.get("partition-spec") or [],  # type: ignore
+            partition_specs=d.get("partition-specs") or [],  # type: ignore
+            default_spec_id=d.get("default-spec-id"),  # type: ignore
+            last_partition_id=d.get("last-partition-id"),  # type: ignore
+            properties=d.get("properties") or {},  # type: ignore
+            current_snapshot_id=d.get("current-snapshot-id"),  # type: ignore
+            snapshots=d.get("snapshots") or [],  # type: ignore
+            snapshot_log=d.get("snapshot-log") or [],  # type: ignore
+            metadata_log=d.get("metadata-log") or [],  # type: ignore
+            sort_orders=d.get("sort-orders") or [],  # type: ignore
+            default_sort_order_id=d.get("default-sort-order-id"),  # type: ignore
+        )  # type: ignore
+
+    def to_dict(self) -> dict:
+        """Generate a dictionary representation of a TableMetadata instance
+
+        Returns:
+            dict: A dictionary representation of a TableMetadata instance
+        """
+        d = {
+            "format-version": self.format_version,
+            "table-uuid": self.table_uuid,
+            "location": self.location,
+            "last-updated-ms": self.last_updated_ms,
+            "last-column-id": self.last_column_id,
+            "schemas": self.schemas,
+            "current-schema-id": self.current_schema_id,
+            "partition-specs": self.partition_specs,
+            "default-spec-id": self.default_spec_id,
+            "last-partition-id": self.last_partition_id,
+            "properties": self.properties,
+            "current-snapshot-id": self.current_snapshot_id,
+            "snapshots": self.snapshots,
+            "snapshot-log": self.snapshot_log,
+            "metadata-log": self.metadata_log,
+            "sort-orders": self.sort_orders,
+            "default-sort-order-id": self.default_sort_order_id,
+        }
+
+        if self.format_version == 1:
+            d["schema"] = self.schema
+            d["partition-spec"] = self.partition_spec
+        if self.format_version == 2:
+            d["last-sequence-number"] = self.last_sequence_number
+

Review Comment:
   I would add here:
   ```python
   else:
       logger.warn(f"Unknown format version: {self.format_version}")



##########
python/tests/conftest.py:
##########
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This contains global pytest configurations.
+
+Fixtures contained in this file will be automatically used if provided as an argument
+to any pytest function.
+
+Example:
+    >>> def test_file_io(LocalFileIOFixture):
+            file_io = LocalFileIOFixture()  # The LocalFileIOFixture argument is automatically converted to LocalFileIO
+
+In the case where the fixture must be used in a pytest.mark.parametrize decorator the string  representation can be used
+and the built-in pytest fixture request should be used as an additional argument in the function. The fixture can then be
+retrieved using `request.getfixturevalue(fixture_name)`.
+
+Example:
+    >>> @pytest.mark.parametrize("CustomFileIO", ["LocalFileIOFixture"])
+        def test_file_io(CustomFileIO, request):
+            file_io = request.getfixturevalue(CustomFileIO)  # This will retrieve LocalFileIO
+"""
+import os
+from typing import Union
+from urllib.parse import ParseResult, urlparse
+
+import pytest
+
+from iceberg.io.base import FileIO, InputFile, OutputFile
+
+
+class LocalInputFile(InputFile):
+    """An InputFile implementation for local files (for test use only)"""
+
+    def __init__(self, location: str):
+
+        parsed_location = urlparse(location)  # Create a ParseResult from the uri
+        if parsed_location.scheme != "file":  # Validate that a uri is provided with a scheme of `file`
+            raise ValueError("LocalInputFile location must have a scheme of `file`")
+        elif parsed_location.netloc:
+            raise ValueError(f"Network location is not allowed for LocalInputFile: {parsed_location.netloc}")
+
+        super().__init__(location=location)
+        self._parsed_location = parsed_location
+
+    @property
+    def parsed_location(self) -> ParseResult:
+        return self._parsed_location
+
+    def __len__(self):
+        return os.path.getsize(self.parsed_location.path)
+
+    def exists(self):
+        return os.path.exists(self.parsed_location.path)
+
+    def open(self):
+        return open(self.parsed_location.path, "rb")
+
+
+class LocalOutputFile(OutputFile):
+    """An OutputFile implementation for local files (for test use only)"""
+
+    def __init__(self, location: str):
+
+        parsed_location = urlparse(location)  # Create a ParseResult from the uri
+        if parsed_location.scheme != "file":  # Validate that a uri is provided with a scheme of `file`
+            raise ValueError("LocalOutputFile location must have a scheme of `file`")
+        elif parsed_location.netloc:
+            raise ValueError(f"Network location is not allowed for LocalOutputFile: {parsed_location.netloc}")
+
+        super().__init__(location=location)
+        self._parsed_location = parsed_location
+
+    @property
+    def parsed_location(self) -> ParseResult:
+        return self._parsed_location
+
+    def __len__(self):
+        return os.path.getsize(self.parsed_location.path)
+
+    def exists(self):
+        return os.path.exists(self.parsed_location.path)
+
+    def to_input_file(self):
+        return LocalInputFile(location=self.location)
+
+    def create(self, overwrite: bool = False) -> None:
+        return open(self.parsed_location.path, "wb" if overwrite else "xb")
+
+
+class LocalFileIO(FileIO):
+    """A FileIO implementation for local files (for test use only)"""
+
+    def new_input(self, location: str):
+        return LocalInputFile(location=location)
+
+    def new_output(self, location: str):
+        return LocalOutputFile(location=location)
+
+    def delete(self, location: Union[str, LocalInputFile, LocalOutputFile]):
+        parsed_location = location.parsed_location if isinstance(location, (InputFile, OutputFile)) else urlparse(location)
+        os.remove(parsed_location.path)
+
+
+@pytest.fixture
+def LocalInputFileFixture():
+    """This fixture is available in any pytest function
+
+    If the function uses an argument named `LocalInputFileFixture`, this fixture will be
+    automatically used. In the case where the fixture must be used in a pytest.mark.parametrize
+    definition, the string "LocalInputFileFixture" can be used and the built-in pytest fixture
+    request should be used as an argument in the function. This fixture can then be retrieved
+    using `request.getfixturevalue("LocalInputFileFixture")`.
+    """
+    return LocalInputFile
+
+
+@pytest.fixture
+def LocalOutputFileFixture():
+    return LocalOutputFile
+
+
+@pytest.fixture
+def LocalFileIOFixture():
+    """"""

Review Comment:
   nit: maybe remove this one? Looks messy :)



##########
python/src/iceberg/table/metadata.py:
##########
@@ -0,0 +1,366 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import codecs
+import json
+
+import attr
+from jsonschema import validate as validate_json
+from jsonschema.exceptions import ValidationError
+
+from iceberg.io.base import InputFile, OutputFile
+
+TABLE_METADATA_V1_SCHEMA = {
+    "$schema": "http://json-schema.org/draft-07/schema",
+    "type": "object",
+    "required": [
+        "format-version",
+        "location",
+        "last-updated-ms",
+        "last-column-id",
+        "schema",
+        "partition-spec",
+    ],
+    "properties": {
+        "format-version": {"type": "number"},
+        "table-uuid": {"type": "string"},
+        "location": {"type": "string"},
+        "last-updated-ms": {"type": "number"},
+        "last-column-id": {"type": "number"},
+        "schema": {
+            "type": "object",
+        },
+        "schemas": {
+            "type": "array",
+        },
+        "current-schema-id": {"type": "number"},
+        "partition-spec": {
+            "type": "array",
+        },
+        "partition-specs": {
+            "type": "array",
+        },
+        "default-spec-id": {"type": "number"},
+        "last-partition-id": {"type": "number"},
+        "properties": {
+            "type": "object",
+        },
+        "current-snapshot-id": {"type": "number"},
+        "snapshots": {
+            "type": "array",
+        },
+        "snapshot-log": {
+            "type": "array",
+        },
+        "metadata-log": {
+            "type": "array",
+        },
+        "sort-orders": {
+            "type": "array",
+        },
+        "default-sort-order-id": {"type": "number"},
+    },
+    "additionalProperties": False,
+}
+
+TABLE_METADATA_V2_SCHEMA = {
+    "$schema": "http://json-schema.org/draft-07/schema",
+    "type": "object",
+    "required": [
+        "format-version",
+        "table-uuid",
+        "location",
+        "last-sequence-number",
+        "last-updated-ms",
+        "last-column-id",
+        "schemas",
+        "current-schema-id",
+        "partition-specs",
+        "default-spec-id",
+        "last-partition-id",
+        "default-sort-order-id",
+        "sort-orders",
+    ],
+    "properties": {
+        "format-version": {"type": "number"},
+        "table-uuid": {"type": "string"},
+        "location": {"type": "string"},
+        "last-sequence-number": {"type": "number"},
+        "last-updated-ms": {"type": "number"},
+        "last-column-id": {"type": "number"},
+        "schemas": {
+            "type": "array",
+        },
+        "current-schema-id": {"type": "number"},
+        "partition-specs": {
+            "type": "array",
+        },
+        "default-spec-id": {"type": "number"},
+        "last-partition-id": {"type": "number"},
+        "properties": {
+            "type": "object",
+        },
+        "current-snapshot-id": {"type": "number"},
+        "snapshots": {
+            "type": "array",
+        },
+        "snapshot-log": {
+            "type": "array",
+        },
+        "metadata-log": {
+            "type": "array",
+        },
+        "sort-orders": {
+            "type": "array",
+        },
+        "default-sort-order-id": {"type": "number"},
+    },
+    "additionalProperties": False,
+}
+
+
+@attr.s(frozen=True, auto_attribs=True)
+class TableMetadata:
+    """Metadata for an Iceberg table as specified in the Apache Iceberg
+    spec (https://iceberg.apache.org/spec/#iceberg-table-spec)"""
+
+    format_version: int
+    """An integer version number for the format. Currently, this can be 1 or 2
+    based on the spec. Implementations must throw an exception if a table’s
+    version is higher than the supported version."""
+
+    table_uuid: str
+    """A UUID that identifies the table, generated when the table is created. 
+    Implementations must throw an exception if a table’s UUID does not match 
+    the expected UUID after refreshing metadata."""
+
+    location: str
+    """The table’s base location. This is used by writers to determine where 
+    to store data files, manifest files, and table metadata files."""
+
+    last_sequence_number: int
+    """The table’s highest assigned sequence number, a monotonically
+    increasing long that tracks the order of snapshots in a table."""
+
+    last_updated_ms: int
+    """Timestamp in milliseconds from the unix epoch when the table
+    was last updated. Each table metadata file should update this
+    field just before writing."""
+
+    last_column_id: int
+    """An integer; the highest assigned column ID for the table. 
+    This is used to ensure columns are always assigned an unused ID
+    when evolving schemas."""
+
+    schema: dict
+    """The table’s current schema. (Deprecated: use schemas and 
+    current-schema-id instead)"""
+
+    schemas: list
+    """A list of schemas, stored as objects with schema-id."""
+
+    current_schema_id: int
+    """ID of the table’s current schema."""
+
+    partition_spec: dict
+    """The table’s current partition spec, stored as only fields. 
+    Note that this is used by writers to partition data, but is 
+    not used when reading because reads use the specs stored in 
+    manifest files. (Deprecated: use partition-specs and default-spec-id 
+    instead)"""
+
+    partition_specs: list
+    """A list of partition specs, stored as full partition spec objects."""
+
+    default_spec_id: int
+    """ID of the “current” spec that writers should use by default."""
+
+    last_partition_id: int
+    """An integer; the highest assigned partition field ID across all 
+    partition specs for the table. This is used to ensure partition fields 
+    are always assigned an unused ID when evolving specs."""
+
+    properties: dict
+    """	A string to string map of table properties. This is used to 
+    control settings that affect reading and writing and is not intended 
+    to be used for arbitrary metadata. For example, commit.retry.num-retries 
+    is used to control the number of commit retries."""
+
+    current_snapshot_id: int
+    """ID of the current table snapshot."""
+
+    snapshots: list
+    """A list of valid snapshots. Valid snapshots are snapshots for which 
+    all data files exist in the file system. A data file must not be 
+    deleted from the file system until the last snapshot in which it was 
+    listed is garbage collected."""
+
+    snapshot_log: list
+    """A list (optional) of timestamp and snapshot ID pairs that encodes 
+    changes to the current snapshot for the table. Each time the 
+    current-snapshot-id is changed, a new entry should be added with the 
+    last-updated-ms and the new current-snapshot-id. When snapshots are 
+    expired from the list of valid snapshots, all entries before a snapshot 
+    that has expired should be removed."""
+
+    metadata_log: list
+    """A list (optional) of timestamp and metadata file location pairs that 
+    encodes changes to the previous metadata files for the table. Each time 
+    a new metadata file is created, a new entry of the previous metadata 
+    file location should be added to the list. Tables can be configured to 
+    remove oldest metadata log entries and keep a fixed-size log of the most 
+    recent entries after a commit."""
+
+    sort_orders: list
+    """A list of sort orders, stored as full sort order objects."""
+
+    default_sort_order_id: int
+    """Default sort order id of the table. Note that this could be used by 
+    writers, but is not used when reading because reads use the specs stored
+     in manifest files."""
+
+    def validate(self) -> None:
+        """Checks that the table metadata object is valid. The validation schema
+        used depends on the Iceberg table metadata version."""
+        if self.format_version == 1:
+            self.validate_v1(self.to_dict())
+        elif self.format_version == 2:
+            self.validate_v2(self.to_dict())
+        else:
+            raise ValueError(f"Unknown table metadata version {self.format_version}")
+
+    @staticmethod
+    def validate_v1(metadata: dict) -> None:
+        """Perform a JSONSchema validation using the v1 Iceberg table metadata schema"""
+        try:
+            validate_json(instance=metadata, schema=TABLE_METADATA_V1_SCHEMA)
+        except ValidationError as e:
+            # TODO Log something here
+            raise (e)
+
+    @staticmethod
+    def validate_v2(metadata: dict) -> None:
+        """Perform a JSONSchema validation using the v2 Iceberg table metadata schema"""
+        try:
+            validate_json(instance=metadata, schema=TABLE_METADATA_V2_SCHEMA)
+        except ValidationError as e:
+            # TODO Log something here
+            raise (e)
+
+    @classmethod
+    def from_byte_stream(cls, byte_stream, encoding: str = "utf-8") -> "TableMetadata":
+        """Instantiate a TableMetadata object from a byte stream
+
+        Args:
+            byte_stream: A file-like byte stream object
+            encoding (default "utf-8"): The byte encoder to use for the reader
+        """
+        reader = codecs.getreader(encoding)
+        metadata = json.load(reader(byte_stream))
+        return cls.from_dict(metadata)
+
+    @classmethod
+    def from_input_file(cls, input_file: InputFile, encoding: str = "utf-8") -> "TableMetadata":
+        """Create a TableMetadata instance from an input file
+
+        Args:
+            input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract
+            base class
+            encoding (str): Encoding to use when loading bytestream
+
+        Returns:
+            TableMetadata: A table metadata instance
+
+        """
+        return cls.from_byte_stream(byte_stream=input_file.open(), encoding=encoding)
+
+    def to_output_file(self, output_file: OutputFile, overwrite: bool = False) -> None:
+        """Write a TableMetadata instance to an output file
+
+        Args:
+            output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract
+            base class
+            overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
+        """
+        f = output_file.create(overwrite=overwrite)
+        f.write(json.dumps(self.to_dict()).encode("utf-8"))
+
+    @classmethod
+    def from_dict(cls, d: dict) -> "TableMetadata":
+        """Instantiates a TableMetadata object using a dictionary
+
+        Args:
+            d: A dictionary object that conforms to table metadata specification
+        Returns:
+            TableMetadata: A table metadata instance
+        """
+        return cls(  # type: ignore
+            format_version=d.get("format-version"),  # type: ignore
+            table_uuid=d.get("table-uuid"),  # type: ignore
+            location=d.get("location"),  # type: ignore
+            last_sequence_number=d.get("last-sequence-number"),  # type: ignore
+            last_updated_ms=d.get("last-updated-ms"),  # type: ignore
+            last_column_id=d.get("last-column-id"),  # type: ignore
+            schema=d.get("schema") or {},  # type: ignore
+            schemas=d.get("schemas") or [],  # type: ignore
+            current_schema_id=d.get("current-schema-id"),  # type: ignore
+            partition_spec=d.get("partition-spec") or [],  # type: ignore
+            partition_specs=d.get("partition-specs") or [],  # type: ignore
+            default_spec_id=d.get("default-spec-id"),  # type: ignore
+            last_partition_id=d.get("last-partition-id"),  # type: ignore
+            properties=d.get("properties") or {},  # type: ignore
+            current_snapshot_id=d.get("current-snapshot-id"),  # type: ignore
+            snapshots=d.get("snapshots") or [],  # type: ignore
+            snapshot_log=d.get("snapshot-log") or [],  # type: ignore
+            metadata_log=d.get("metadata-log") or [],  # type: ignore
+            sort_orders=d.get("sort-orders") or [],  # type: ignore
+            default_sort_order_id=d.get("default-sort-order-id"),  # type: ignore
+        )  # type: ignore
+
+    def to_dict(self) -> dict:
+        """Generate a dictionary representation of a TableMetadata instance
+
+        Returns:
+            dict: A dictionary representation of a TableMetadata instance
+        """
+        d = {
+            "format-version": self.format_version,
+            "table-uuid": self.table_uuid,
+            "location": self.location,
+            "last-updated-ms": self.last_updated_ms,
+            "last-column-id": self.last_column_id,
+            "schemas": self.schemas,
+            "current-schema-id": self.current_schema_id,
+            "partition-specs": self.partition_specs,
+            "default-spec-id": self.default_spec_id,
+            "last-partition-id": self.last_partition_id,
+            "properties": self.properties,
+            "current-snapshot-id": self.current_snapshot_id,
+            "snapshots": self.snapshots,
+            "snapshot-log": self.snapshot_log,
+            "metadata-log": self.metadata_log,
+            "sort-orders": self.sort_orders,
+            "default-sort-order-id": self.default_sort_order_id,
+        }
+
+        if self.format_version == 1:
+            d["schema"] = self.schema
+            d["partition-spec"] = self.partition_spec
+        if self.format_version == 2:

Review Comment:
   ```suggestion
           elif self.format_version == 2:
   ```



##########
python/src/iceberg/table/metadata.py:
##########
@@ -0,0 +1,366 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import codecs
+import json
+
+import attr
+from jsonschema import validate as validate_json
+from jsonschema.exceptions import ValidationError
+
+from iceberg.io.base import InputFile, OutputFile
+
+TABLE_METADATA_V1_SCHEMA = {
+    "$schema": "http://json-schema.org/draft-07/schema",
+    "type": "object",
+    "required": [
+        "format-version",
+        "location",
+        "last-updated-ms",
+        "last-column-id",
+        "schema",
+        "partition-spec",
+    ],
+    "properties": {
+        "format-version": {"type": "number"},
+        "table-uuid": {"type": "string"},
+        "location": {"type": "string"},
+        "last-updated-ms": {"type": "number"},
+        "last-column-id": {"type": "number"},
+        "schema": {
+            "type": "object",
+        },
+        "schemas": {
+            "type": "array",
+        },
+        "current-schema-id": {"type": "number"},
+        "partition-spec": {
+            "type": "array",
+        },
+        "partition-specs": {
+            "type": "array",
+        },
+        "default-spec-id": {"type": "number"},
+        "last-partition-id": {"type": "number"},
+        "properties": {
+            "type": "object",
+        },
+        "current-snapshot-id": {"type": "number"},
+        "snapshots": {
+            "type": "array",
+        },
+        "snapshot-log": {
+            "type": "array",
+        },
+        "metadata-log": {
+            "type": "array",
+        },
+        "sort-orders": {
+            "type": "array",
+        },
+        "default-sort-order-id": {"type": "number"},
+    },
+    "additionalProperties": False,
+}
+
+TABLE_METADATA_V2_SCHEMA = {
+    "$schema": "http://json-schema.org/draft-07/schema",
+    "type": "object",
+    "required": [
+        "format-version",
+        "table-uuid",
+        "location",
+        "last-sequence-number",
+        "last-updated-ms",
+        "last-column-id",
+        "schemas",
+        "current-schema-id",
+        "partition-specs",
+        "default-spec-id",
+        "last-partition-id",
+        "default-sort-order-id",
+        "sort-orders",
+    ],
+    "properties": {
+        "format-version": {"type": "number"},
+        "table-uuid": {"type": "string"},
+        "location": {"type": "string"},
+        "last-sequence-number": {"type": "number"},
+        "last-updated-ms": {"type": "number"},
+        "last-column-id": {"type": "number"},
+        "schemas": {
+            "type": "array",
+        },
+        "current-schema-id": {"type": "number"},
+        "partition-specs": {
+            "type": "array",
+        },
+        "default-spec-id": {"type": "number"},
+        "last-partition-id": {"type": "number"},
+        "properties": {
+            "type": "object",
+        },
+        "current-snapshot-id": {"type": "number"},
+        "snapshots": {
+            "type": "array",
+        },
+        "snapshot-log": {
+            "type": "array",
+        },
+        "metadata-log": {
+            "type": "array",
+        },
+        "sort-orders": {
+            "type": "array",
+        },
+        "default-sort-order-id": {"type": "number"},
+    },
+    "additionalProperties": False,
+}
+
+
+@attr.s(frozen=True, auto_attribs=True)
+class TableMetadata:
+    """Metadata for an Iceberg table as specified in the Apache Iceberg
+    spec (https://iceberg.apache.org/spec/#iceberg-table-spec)"""
+
+    format_version: int
+    """An integer version number for the format. Currently, this can be 1 or 2
+    based on the spec. Implementations must throw an exception if a table’s
+    version is higher than the supported version."""
+
+    table_uuid: str
+    """A UUID that identifies the table, generated when the table is created. 
+    Implementations must throw an exception if a table’s UUID does not match 
+    the expected UUID after refreshing metadata."""
+
+    location: str
+    """The table’s base location. This is used by writers to determine where 
+    to store data files, manifest files, and table metadata files."""
+
+    last_sequence_number: int
+    """The table’s highest assigned sequence number, a monotonically
+    increasing long that tracks the order of snapshots in a table."""
+
+    last_updated_ms: int
+    """Timestamp in milliseconds from the unix epoch when the table
+    was last updated. Each table metadata file should update this
+    field just before writing."""
+
+    last_column_id: int
+    """An integer; the highest assigned column ID for the table. 
+    This is used to ensure columns are always assigned an unused ID
+    when evolving schemas."""
+
+    schema: dict
+    """The table’s current schema. (Deprecated: use schemas and 
+    current-schema-id instead)"""
+
+    schemas: list
+    """A list of schemas, stored as objects with schema-id."""
+
+    current_schema_id: int
+    """ID of the table’s current schema."""
+
+    partition_spec: dict
+    """The table’s current partition spec, stored as only fields. 
+    Note that this is used by writers to partition data, but is 
+    not used when reading because reads use the specs stored in 
+    manifest files. (Deprecated: use partition-specs and default-spec-id 
+    instead)"""
+
+    partition_specs: list
+    """A list of partition specs, stored as full partition spec objects."""
+
+    default_spec_id: int
+    """ID of the “current” spec that writers should use by default."""
+
+    last_partition_id: int
+    """An integer; the highest assigned partition field ID across all 
+    partition specs for the table. This is used to ensure partition fields 
+    are always assigned an unused ID when evolving specs."""
+
+    properties: dict
+    """	A string to string map of table properties. This is used to 
+    control settings that affect reading and writing and is not intended 
+    to be used for arbitrary metadata. For example, commit.retry.num-retries 
+    is used to control the number of commit retries."""
+
+    current_snapshot_id: int
+    """ID of the current table snapshot."""
+
+    snapshots: list
+    """A list of valid snapshots. Valid snapshots are snapshots for which 
+    all data files exist in the file system. A data file must not be 
+    deleted from the file system until the last snapshot in which it was 
+    listed is garbage collected."""
+
+    snapshot_log: list
+    """A list (optional) of timestamp and snapshot ID pairs that encodes 
+    changes to the current snapshot for the table. Each time the 
+    current-snapshot-id is changed, a new entry should be added with the 
+    last-updated-ms and the new current-snapshot-id. When snapshots are 
+    expired from the list of valid snapshots, all entries before a snapshot 
+    that has expired should be removed."""
+
+    metadata_log: list
+    """A list (optional) of timestamp and metadata file location pairs that 
+    encodes changes to the previous metadata files for the table. Each time 
+    a new metadata file is created, a new entry of the previous metadata 
+    file location should be added to the list. Tables can be configured to 
+    remove oldest metadata log entries and keep a fixed-size log of the most 
+    recent entries after a commit."""
+
+    sort_orders: list
+    """A list of sort orders, stored as full sort order objects."""
+
+    default_sort_order_id: int
+    """Default sort order id of the table. Note that this could be used by 
+    writers, but is not used when reading because reads use the specs stored
+     in manifest files."""
+
+    def validate(self) -> None:
+        """Checks that the table metadata object is valid. The validation schema
+        used depends on the Iceberg table metadata version."""
+        if self.format_version == 1:
+            self.validate_v1(self.to_dict())
+        elif self.format_version == 2:
+            self.validate_v2(self.to_dict())
+        else:
+            raise ValueError(f"Unknown table metadata version {self.format_version}")
+
+    @staticmethod
+    def validate_v1(metadata: dict) -> None:
+        """Perform a JSONSchema validation using the v1 Iceberg table metadata schema"""
+        try:
+            validate_json(instance=metadata, schema=TABLE_METADATA_V1_SCHEMA)
+        except ValidationError as e:
+            # TODO Log something here
+            raise (e)
+
+    @staticmethod
+    def validate_v2(metadata: dict) -> None:
+        """Perform a JSONSchema validation using the v2 Iceberg table metadata schema"""
+        try:
+            validate_json(instance=metadata, schema=TABLE_METADATA_V2_SCHEMA)
+        except ValidationError as e:
+            # TODO Log something here
+            raise (e)
+
+    @classmethod
+    def from_byte_stream(cls, byte_stream, encoding: str = "utf-8") -> "TableMetadata":
+        """Instantiate a TableMetadata object from a byte stream
+
+        Args:
+            byte_stream: A file-like byte stream object
+            encoding (default "utf-8"): The byte encoder to use for the reader
+        """
+        reader = codecs.getreader(encoding)
+        metadata = json.load(reader(byte_stream))
+        return cls.from_dict(metadata)
+
+    @classmethod
+    def from_input_file(cls, input_file: InputFile, encoding: str = "utf-8") -> "TableMetadata":
+        """Create a TableMetadata instance from an input file
+
+        Args:
+            input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract
+            base class
+            encoding (str): Encoding to use when loading bytestream
+
+        Returns:
+            TableMetadata: A table metadata instance
+
+        """
+        return cls.from_byte_stream(byte_stream=input_file.open(), encoding=encoding)
+
+    def to_output_file(self, output_file: OutputFile, overwrite: bool = False) -> None:
+        """Write a TableMetadata instance to an output file
+
+        Args:
+            output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract
+            base class
+            overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
+        """
+        f = output_file.create(overwrite=overwrite)
+        f.write(json.dumps(self.to_dict()).encode("utf-8"))
+

Review Comment:
   `f.close()` is missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org