You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "Fokko (via GitHub)" <gi...@apache.org> on 2023/06/04 12:33:38 UTC

[GitHub] [iceberg] Fokko commented on a diff in pull request #6323: Python: Alter table plumbing and REST support

Fokko commented on code in PR #6323:
URL: https://github.com/apache/iceberg/pull/6323#discussion_r1216649180


##########
python/pyiceberg/table/__init__.py:
##########
@@ -69,21 +71,268 @@
     import ray
     from duckdb import DuckDBPyConnection
 
+    from pyiceberg.catalog import Catalog
 
 ALWAYS_TRUE = AlwaysTrue()
 
 
+class AlterTable:
+    _table: Table
+    _updates: Tuple[BaseTableUpdate, ...]
+
+    def __init__(self, table: Table, actions: Optional[Tuple[BaseTableUpdate, ...]] = None):
+        self._table = table
+        self._updates = actions or ()
+
+    def _append_updates(self, *new_updates: BaseTableUpdate) -> AlterTable:
+        """Appends updates to the set of staged updates
+
+        Args:
+            *new_updates: Any new updates
+
+        Raises:
+            ValueError: When the type of update is not unique.
+
+        Returns:
+            A new AlterTable object with the new updates appended
+        """
+        for new_update in new_updates:
+            type_new_update = type(new_update)
+            if any(type(update) == type_new_update for update in self._updates):
+                raise ValueError(f"Updates in a single commit need to be unique, duplicate: {type_new_update}")
+        return AlterTable(self._table, self._updates + new_updates)
+
+    def set_table_version(self, format_version: int) -> AlterTable:
+        """Sets the table to a certain version
+
+        Args:
+            format_version: The newly set version
+
+        Returns:
+            The alter table builder
+        """
+        if format_version not in {1, 2}:
+            raise ValueError(f"Format version not (yet) supported: {format_version}")
+        return self._append_updates(UpgradeFormatVersionUpdate(format_version=format_version))
+
+    def set_schema(self, new_schema: Schema) -> AlterTable:
+        """Set the schema, and updates the current-schema-id
+
+        Args:
+            new_schema: The new schema
+
+        Returns:
+            The alter table builder
+
+        Raises:
+            ValueError: When a schema with the same fields already exists
+        """
+        last_column_id = max(self._table.schema().highest_field_id, new_schema.highest_field_id)
+
+        exists = [schema for schema in self._table.schemas().values() if new_schema.fields == schema.fields]
+        if len(exists) > 0:
+            raise ValueError(f"Schema already exists, schema-id {exists[0].schema_id}")
+
+        return self._append_updates(AddSchemaUpdate(schema_=new_schema, last_column_id=last_column_id), SetCurrentSchemaUpdate())
+
+    def set_partition_spec(self, spec: PartitionSpec) -> AlterTable:
+        """Sets the partition spec, and updates the default-spec-id
+
+        Args:
+            spec: The new partition spec
+
+        Returns:
+            The alter table builder
+        """
+        return self._append_updates(AddPartitionSpecUpdate(spec=spec), SetDefaultSpecUpdate())
+
+    def set_sort_order(self, sort_order: SortOrder) -> AlterTable:
+        """Sets the sort order, and updates the default-sort-order-id
+
+        Args:
+            sort_order: The new sort order
+
+        Returns:
+            The alter table builder
+        """
+        return self._append_updates(AddSortOrderUpdate(sort_order=sort_order), SetDefaultSortOrderUpdate())
+
+    def set_properties(self, **updates: str) -> AlterTable:
+        """Set properties
+
+        When a property is already set, it will be overwritten
+
+        Args:
+            updates: The properties set on the table
+
+        Returns:
+            The alter table builder
+        """
+        return self._append_updates(SetPropertiesUpdate(updates=updates))
+
+    def unset_properties(self, *removals: str) -> AlterTable:
+        """Removes properties
+
+        Args:
+            removals: Properties to be removed
+
+        Returns:
+            The alter table builder
+        """
+        return self._append_updates(RemovePropertiesUpdate(removals=removals))
+
+    def update_location(self, location: str) -> AlterTable:
+        """Sets the new table location
+
+        Args:
+            location: The new location of the table
+
+        Returns:
+            The alter table builder
+        """
+        return self._append_updates(SetLocationUpdate(location=location))
+
+    def commit(self) -> Table:
+        """Commits the changes to the catalog
+
+        Returns:
+            The table with the updates applied
+        """
+        # Strip the catalog name
+        if len(self._updates) > 0:
+            table_response = self._table.catalog.alter_table(self._table.identifier[1:], self._updates)
+            return Table(
+                self._table.identifier,
+                metadata=table_response.metadata,
+                metadata_location=table_response.metadata_location,
+                io=self._table.io,
+                catalog=self._table.catalog,
+            )
+        else:
+            return self._table
+
+
+class TableUpdateAction(Enum):
+    upgrade_format_version = "upgrade-format-version"
+    add_schema = "add-schema"
+    set_current_schema = "set-current-schema"
+    add_spec = "add-spec"
+    set_default_spec = "set-default-spec"
+    add_sort_order = "add-sort-order"
+    set_default_sort_order = "set-default-sort-order"
+    add_snapshot = "add-snapshot"
+    set_snapshot_ref = "set-snapshot-ref"
+    remove_snapshots = "remove-snapshots"
+    remove_snapshot_ref = "remove-snapshot-ref"
+    set_location = "set-location"
+    set_properties = "set-properties"
+    remove_properties = "remove-properties"
+
+
+class BaseTableUpdate(IcebergBaseModel):
+    action: TableUpdateAction
+
+
+class UpgradeFormatVersionUpdate(BaseTableUpdate):
+    action = TableUpdateAction.upgrade_format_version
+    format_version: int = Field(alias="format-version")
+
+
+class AddSchemaUpdate(BaseTableUpdate):
+    action = TableUpdateAction.add_schema
+    schema_: Schema = Field(alias="schema")
+    last_column_id: int = Field(alias="last-column-id")
+
+
+class SetCurrentSchemaUpdate(BaseTableUpdate):
+    action = TableUpdateAction.set_current_schema
+    schema_id: int = Field(
+        alias="schema-id", description="Schema ID to set as current, or -1 to set last added schema", default=-1
+    )
+
+
+class AddPartitionSpecUpdate(BaseTableUpdate):
+    action = TableUpdateAction.add_spec
+    spec: PartitionSpec
+
+
+class SetDefaultSpecUpdate(BaseTableUpdate):
+    action = TableUpdateAction.set_default_spec
+    spec_id: int = Field(
+        alias="spec-id", description="Partition spec ID to set as the default, or -1 to set last added spec", default=-1
+    )
+
+
+class AddSortOrderUpdate(BaseTableUpdate):
+    action = TableUpdateAction.add_sort_order
+    sort_order: SortOrder = Field(alias="sort-order")
+
+
+class SetDefaultSortOrderUpdate(BaseTableUpdate):
+    action = TableUpdateAction.set_default_sort_order
+    sort_order_id: int = Field(
+        alias="sort-order-id", description="Sort order ID to set as the default, or -1 to set last added sort order", default=-1
+    )
+
+
+class AddSnapshotUpdate(BaseTableUpdate):
+    action = TableUpdateAction.add_snapshot
+    snapshot: Snapshot
+
+
+class SetSnapshotRefUpdate(BaseTableUpdate):
+    action = TableUpdateAction.set_snapshot_ref
+    ref_name: str = Field(alias="ref-name")
+
+
+class RemoveSnapshotsUpdate(BaseTableUpdate):
+    action = TableUpdateAction.remove_snapshots
+    snapshot_ids: List[int] = Field(alias="snapshot-ids")
+
+
+class RemoveSnapshotRefUpdate(BaseTableUpdate):
+    action = TableUpdateAction.remove_snapshot_ref
+    ref_name: str = Field(alias="ref-name")
+
+
+class SetLocationUpdate(BaseTableUpdate):
+    action = TableUpdateAction.set_location
+    location: str
+
+
+class SetPropertiesUpdate(BaseTableUpdate):
+    action = TableUpdateAction.set_properties
+    updates: Dict[str, str]
+
+
+class RemovePropertiesUpdate(BaseTableUpdate):
+    action = TableUpdateAction.remove_properties
+    removals: List[str]
+
+
+class CommitTableRequest(IcebergBaseModel):
+    requirements: List[Any] = Field(default_factory=list)

Review Comment:
   I'd prefer to generate the code from the spec, so we know that the spec makes sense. I've updated it here: https://github.com/apache/iceberg/pull/7710. I also created a PR to get more insights into what kind of code the yaml is generating: https://github.com/apache/iceberg/pull/7751 I believe the open-api yaml is very handy when implementing clients in other languages (Rust, Go, etc), but then we need to make sure that we generate what we expect (and for me the open-api yaml is sometimes not very straightforward).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org