You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/24 12:13:55 UTC

[GitHub] [iceberg] Fokko opened a new pull request, #5627: Python: Reassign schema/partition-spec/sort-order ids

Fokko opened a new pull request, #5627:
URL: https://github.com/apache/iceberg/pull/5627

   When creating a new schema.
   
   Also created a type alias called `TableMetadata` that replaces the `Union[TableMetadataV1, TableMetadataV2]` annotation.
   
   Resolves #5468


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954148175


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +640,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field = schema.find_field(field_id)
+        if original_field is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field.name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(SchemaVisitor[IcebergType]):
+    """Traverses the schema to get the highest field-id"""
+
+    counter: int
+
+    def __init__(self) -> None:
+        self.counter = 0
+
+    def _get_and_increment(self) -> int:
+        pos = self.counter
+        self.counter += 1
+        return pos
+
+    def schema(self, schema: Schema, struct_result: StructType) -> Schema:
+        return Schema(*struct_result.fields, schema_id=INITIAL_SCHEMA_ID, identifier_field_ids=schema.identifier_field_ids)

Review Comment:
   `INITIAL_SCHEMA_ID` is the default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r956787377


##########
python/pyiceberg/schema.py:
##########
@@ -276,6 +279,32 @@ def primitive(self, primitive: PrimitiveType) -> T:
         """Visit a PrimitiveType"""
 
 
+class PreOrderSchemaVisitor(Generic[T], ABC):

Review Comment:
   Is this pre-order? In Java we called it `CustomOrder` because you can choose when to visit children by accessing the callable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954175017


##########
python/pyiceberg/table/metadata.py:
##########
@@ -327,24 +333,45 @@ def check_sort_orders(cls, values: Dict[str, Any]):
     based on the spec. Implementations must throw an exception if a table’s
     version is higher than the supported version."""
 
-    table_uuid: UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
-    """A UUID that identifies the table, generated when the table is created.
-    Implementations must throw an exception if a table’s UUID does not match
-    the expected UUID after refreshing metadata."""
-
     last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
     """The table’s highest assigned sequence number, a monotonically
     increasing long that tracks the order of snapshots in a table."""
 
 
-class TableMetadata:
+TableMetadata = Union[TableMetadataV1, TableMetadataV2]
+
+
+def new_table_metadata(
+    schema: Schema, partition_spec: PartitionSpec, sort_order: SortOrder, location: str, properties: Properties = EMPTY_DICT
+) -> TableMetadata:
+    fresh_schema = assign_fresh_schema_ids(schema)
+    fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, fresh_schema)
+    fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)
+
+    return TableMetadataV2(
+        location=location,
+        schemas=[fresh_schema],
+        current_schema_id=fresh_schema.schema_id,
+        partition_specs=[fresh_partition_spec],
+        default_spec_id=fresh_partition_spec.spec_id,
+        sort_orders=[fresh_sort_order],
+        default_sort_order_id=fresh_sort_order.order_id,
+        properties=properties,
+        last_column_id=fresh_schema.highest_field_id,

Review Comment:
   Can you move this up by schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r958815658


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field_name = schema.find_column_name(field_id)
+        if original_field_name is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field_name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field_name}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
+    """Traverses the schema and assigns monotonically increasing ids"""
+
+    counter: itertools.count
+
+    def __init__(self, start: int = 1) -> None:
+        self.counter = itertools.count(start)
+
+    def _get_and_increment(self) -> int:
+        return next(self.counter)
+
+    def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema:
+        return Schema(*struct_result().fields, identifier_field_ids=schema.identifier_field_ids)

Review Comment:
   Yes, we do that in the function itself:
   ```python
   def assign_fresh_schema_ids(schema: Schema) -> Schema:
       """Traverses the schema, and sets new IDs"""
       schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
   
       fresh_identifier_field_ids = []
       new_schema = Schema(*schema_struct.fields)
       for field_id in schema.identifier_field_ids:
           original_field_name = schema.find_column_name(field_id)
           if original_field_name is None:
               raise ValueError(f"Could not find field: {field_id}")
           fresh_field = new_schema.find_field(original_field_name)
           if fresh_field is None:
               raise ValueError(f"Could not lookup field in new schema: {original_field_name}")
           fresh_identifier_field_ids.append(fresh_field.field_id)
   
       return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
   ```
   This is because we first want to know all the IDs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954229123


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +640,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field = schema.find_field(field_id)
+        if original_field is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field.name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(SchemaVisitor[IcebergType]):
+    """Traverses the schema to get the highest field-id"""
+
+    counter: int
+
+    def __init__(self) -> None:
+        self.counter = 0
+
+    def _get_and_increment(self) -> int:
+        pos = self.counter
+        self.counter += 1
+        return pos
+
+    def schema(self, schema: Schema, struct_result: StructType) -> Schema:
+        return Schema(*struct_result.fields, schema_id=INITIAL_SCHEMA_ID, identifier_field_ids=schema.identifier_field_ids)

Review Comment:
   Ah, right. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954246099


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +640,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field = schema.find_field(field_id)
+        if original_field is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field.name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(SchemaVisitor[IcebergType]):
+    """Traverses the schema to get the highest field-id"""
+
+    counter: int
+
+    def __init__(self) -> None:

Review Comment:
   I've changed it, but the counter is nested in the visitor, so I'm not sure how useful it is. Also, I felt fancy and changed it with `itertools.count`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954158187


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +640,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field = schema.find_field(field_id)
+        if original_field is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field.name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(SchemaVisitor[IcebergType]):

Review Comment:
   This is one area where we use pre-order rather than post-order in Java. It's not a problem as long as everything gets reassigned, but it is nice to get IDs assigned sequentially. That matches user expectations.
   
   Definitely not a blocker. Up to you whether you want to add a `CustomOrderSchemaVisitor` for this purpose or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954865832


##########
python/pyiceberg/table/metadata.py:
##########
@@ -327,24 +334,43 @@ def check_sort_orders(cls, values: Dict[str, Any]):
     based on the spec. Implementations must throw an exception if a table’s
     version is higher than the supported version."""
 
-    table_uuid: UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
-    """A UUID that identifies the table, generated when the table is created.
-    Implementations must throw an exception if a table’s UUID does not match
-    the expected UUID after refreshing metadata."""
-
     last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
     """The table’s highest assigned sequence number, a monotonically
     increasing long that tracks the order of snapshots in a table."""
 
 
-class TableMetadata:
+TableMetadata = Union[TableMetadataV1, TableMetadataV2]
+
+
+def new_table_metadata(
+    schema: Schema, partition_spec: PartitionSpec, sort_order: SortOrder, location: str, properties: Properties = EMPTY_DICT
+) -> TableMetadata:
+    fresh_schema = assign_fresh_schema_ids(schema)
+    fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, fresh_schema)
+    fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)
+
+    return TableMetadataV2(

Review Comment:
   I'm being bold here, and just create a v2 metadata



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954231802


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +640,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field = schema.find_field(field_id)
+        if original_field is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field.name)

Review Comment:
   Great catch, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954155497


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +640,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field = schema.find_field(field_id)
+        if original_field is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field.name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(SchemaVisitor[IcebergType]):
+    """Traverses the schema to get the highest field-id"""
+
+    counter: int
+
+    def __init__(self) -> None:
+        self.counter = 0

Review Comment:
   While this is technically okay, we don't assign ID 0 in other cases. It has been useful as a way to know that IDs in a schema aren't coming from reassignment and helps us catch errors. I'd recommend starting at 1 or changing assignment to `_increment_and_get`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r958814479


##########
python/pyiceberg/schema.py:
##########
@@ -276,6 +279,32 @@ def primitive(self, primitive: PrimitiveType) -> T:
         """Visit a PrimitiveType"""
 
 
+class PreOrderSchemaVisitor(Generic[T], ABC):

Review Comment:
   It is pre-order traversal since we start at the root and then move to the leaves. In order is a bit less intuitive since it is not a binary tree. You could also do a reverse in-order, but not sure if we need that. We can also call it CustomOrder if you have a strong preference, but I think pre-order is the most logical way of using this visitor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r958853064


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field_name = schema.find_column_name(field_id)
+        if original_field_name is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field_name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field_name}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
+    """Traverses the schema and assigns monotonically increasing ids"""
+
+    counter: itertools.count
+
+    def __init__(self, start: int = 1) -> None:
+        self.counter = itertools.count(start)
+
+    def _get_and_increment(self) -> int:
+        return next(self.counter)
+
+    def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema:
+        return Schema(*struct_result().fields, identifier_field_ids=schema.identifier_field_ids)

Review Comment:
   Ah, I've refactored this because we need to build a map anyway 👍🏻 



##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field_name = schema.find_column_name(field_id)
+        if original_field_name is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field_name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field_name}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
+    """Traverses the schema and assigns monotonically increasing ids"""
+
+    counter: itertools.count
+
+    def __init__(self, start: int = 1) -> None:
+        self.counter = itertools.count(start)
+
+    def _get_and_increment(self) -> int:
+        return next(self.counter)
+
+    def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema:
+        return Schema(*struct_result().fields, identifier_field_ids=schema.identifier_field_ids)
+
+    def struct(self, struct: StructType, field_results: List[Callable[[], IcebergType]]) -> StructType:
+        return StructType(*[field() for field in field_results])
+
+    def field(self, field: NestedField, field_result: Callable[[], IcebergType]) -> IcebergType:
+        return NestedField(
+            field_id=self._get_and_increment(), name=field.name, field_type=field_result(), required=field.required, doc=field.doc

Review Comment:
   Missed that one, thanks! Just updated the code and tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954170279


##########
python/pyiceberg/table/metadata.py:
##########
@@ -103,12 +109,12 @@ def construct_refs(cls, data: Dict[str, Any]):
     """The table’s base location. This is used by writers to determine where
     to store data files, manifest files, and table metadata files."""
 
-    table_uuid: Optional[UUID] = Field(alias="table-uuid", default_factory=uuid4)
+    table_uuid: uuid.UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
     """A UUID that identifies the table, generated when the table is created.
     Implementations must throw an exception if a table’s UUID does not match
     the expected UUID after refreshing metadata."""
 
-    last_updated_ms: int = Field(alias="last-updated-ms")
+    last_updated_ms: int = Field(alias="last-updated-ms", default_factory=lambda: int(time.time() * 1000))

Review Comment:
   Is there no way to get time in milliseconds in Python?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954151892


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +640,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field = schema.find_field(field_id)
+        if original_field is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field.name)

Review Comment:
   I think that identifier field IDs can be nested, so this needs to use `schema.find_column_name(field_id)` to get the name instead. `original_field.name` will return the name within the struct that contains the field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r956788307


##########
python/pyiceberg/table/metadata.py:
##########
@@ -327,24 +334,43 @@ def check_sort_orders(cls, values: Dict[str, Any]):
     based on the spec. Implementations must throw an exception if a table’s
     version is higher than the supported version."""
 
-    table_uuid: UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
-    """A UUID that identifies the table, generated when the table is created.
-    Implementations must throw an exception if a table’s UUID does not match
-    the expected UUID after refreshing metadata."""
-
     last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
     """The table’s highest assigned sequence number, a monotonically
     increasing long that tracks the order of snapshots in a table."""
 
 
-class TableMetadata:
+TableMetadata = Union[TableMetadataV1, TableMetadataV2]
+
+
+def new_table_metadata(
+    schema: Schema, partition_spec: PartitionSpec, sort_order: SortOrder, location: str, properties: Properties = EMPTY_DICT
+) -> TableMetadata:
+    fresh_schema = assign_fresh_schema_ids(schema)
+    fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, fresh_schema)
+    fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)

Review Comment:
   Do the "fresh" methods always reset `schema_id`, `spec_id`, and `order_id`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r956788235


##########
python/pyiceberg/table/metadata.py:
##########
@@ -327,24 +334,43 @@ def check_sort_orders(cls, values: Dict[str, Any]):
     based on the spec. Implementations must throw an exception if a table’s
     version is higher than the supported version."""
 
-    table_uuid: UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
-    """A UUID that identifies the table, generated when the table is created.
-    Implementations must throw an exception if a table’s UUID does not match
-    the expected UUID after refreshing metadata."""
-
     last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
     """The table’s highest assigned sequence number, a monotonically
     increasing long that tracks the order of snapshots in a table."""
 
 
-class TableMetadata:
+TableMetadata = Union[TableMetadataV1, TableMetadataV2]
+
+
+def new_table_metadata(
+    schema: Schema, partition_spec: PartitionSpec, sort_order: SortOrder, location: str, properties: Properties = EMPTY_DICT
+) -> TableMetadata:
+    fresh_schema = assign_fresh_schema_ids(schema)
+    fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, fresh_schema)
+    fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)
+
+    return TableMetadataV2(

Review Comment:
   Yeah, I almost commented last time. I think this is reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r956788471


##########
python/pyiceberg/table/sorting.py:
##########
@@ -135,3 +140,31 @@ def __str__(self) -> str:
 
 UNSORTED_SORT_ORDER_ID = 0
 UNSORTED_SORT_ORDER = SortOrder(order_id=UNSORTED_SORT_ORDER_ID)
+INITIAL_SORT_ORDER_ID = 1
+
+
+def assign_fresh_sort_order_ids(sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema) -> SortOrder:
+    if sort_order.is_unsorted:
+        return UNSORTED_SORT_ORDER
+
+    fresh_fields = []
+    for field in sort_order.fields:
+        original_field = old_schema.find_field(field.source_id)
+        if original_field is None:
+            raise ValueError(f"Could not find in original schema: {field}")
+        fresh_field = fresh_schema.find_field(original_field.name)

Review Comment:
   This still needs to be addressed as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954157192


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +640,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field = schema.find_field(field_id)
+        if original_field is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field.name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(SchemaVisitor[IcebergType]):
+    """Traverses the schema to get the highest field-id"""
+
+    counter: int
+
+    def __init__(self) -> None:

Review Comment:
   You may want to allow initializing the counter. We use it in other places with `last_assigned_id` to assign IDs to new types when adding. We can always do it later, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954174104


##########
python/pyiceberg/table/metadata.py:
##########
@@ -327,24 +333,45 @@ def check_sort_orders(cls, values: Dict[str, Any]):
     based on the spec. Implementations must throw an exception if a table’s
     version is higher than the supported version."""
 
-    table_uuid: UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
-    """A UUID that identifies the table, generated when the table is created.
-    Implementations must throw an exception if a table’s UUID does not match
-    the expected UUID after refreshing metadata."""
-
     last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
     """The table’s highest assigned sequence number, a monotonically
     increasing long that tracks the order of snapshots in a table."""
 
 
-class TableMetadata:
+TableMetadata = Union[TableMetadataV1, TableMetadataV2]
+
+
+def new_table_metadata(
+    schema: Schema, partition_spec: PartitionSpec, sort_order: SortOrder, location: str, properties: Properties = EMPTY_DICT
+) -> TableMetadata:
+    fresh_schema = assign_fresh_schema_ids(schema)
+    fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, fresh_schema)
+    fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)
+
+    return TableMetadataV2(
+        location=location,
+        schemas=[fresh_schema],
+        current_schema_id=fresh_schema.schema_id,
+        partition_specs=[fresh_partition_spec],
+        default_spec_id=fresh_partition_spec.spec_id,
+        sort_orders=[fresh_sort_order],
+        default_sort_order_id=fresh_sort_order.order_id,
+        properties=properties,
+        last_column_id=fresh_schema.highest_field_id,
+        last_partition_id=max(field.field_id for field in fresh_partition_spec.fields)

Review Comment:
   Can we make this a method on the partition spec rather than doing it inline here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954181656


##########
python/pyiceberg/table/sorting.py:
##########
@@ -135,3 +140,31 @@ def __str__(self) -> str:
 
 UNSORTED_SORT_ORDER_ID = 0
 UNSORTED_SORT_ORDER = SortOrder(order_id=UNSORTED_SORT_ORDER_ID)
+INITIAL_SORT_ORDER_ID = 1
+
+
+def assign_fresh_sort_order_ids(sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema) -> SortOrder:
+    if sort_order.is_unsorted:
+        return UNSORTED_SORT_ORDER
+
+    fresh_fields = []
+    for field in sort_order.fields:
+        original_field = old_schema.find_field(field.source_id)
+        if original_field is None:
+            raise ValueError(f"Could not find in original schema: {field}")
+        fresh_field = fresh_schema.find_field(original_field.name)

Review Comment:
   This can't use `original_field.name` because that's the local name. Instead, I think you need to use `old_schema.find_column_name`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954170279


##########
python/pyiceberg/table/metadata.py:
##########
@@ -103,12 +109,12 @@ def construct_refs(cls, data: Dict[str, Any]):
     """The table’s base location. This is used by writers to determine where
     to store data files, manifest files, and table metadata files."""
 
-    table_uuid: Optional[UUID] = Field(alias="table-uuid", default_factory=uuid4)
+    table_uuid: uuid.UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
     """A UUID that identifies the table, generated when the table is created.
     Implementations must throw an exception if a table’s UUID does not match
     the expected UUID after refreshing metadata."""
 
-    last_updated_ms: int = Field(alias="last-updated-ms")
+    last_updated_ms: int = Field(alias="last-updated-ms", default_factory=lambda: int(time.time() * 1000))

Review Comment:
   Can you use `pyiceberg.util.datetime.datetime_to_micros(datetime.now())` to get millisecond precision?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r956788070


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field_name = schema.find_column_name(field_id)
+        if original_field_name is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field_name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field_name}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
+    """Traverses the schema and assigns monotonically increasing ids"""
+
+    counter: itertools.count
+
+    def __init__(self, start: int = 1) -> None:
+        self.counter = itertools.count(start)
+
+    def _get_and_increment(self) -> int:
+        return next(self.counter)
+
+    def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema:
+        return Schema(*struct_result().fields, identifier_field_ids=schema.identifier_field_ids)
+
+    def struct(self, struct: StructType, field_results: List[Callable[[], IcebergType]]) -> StructType:
+        return StructType(*[field() for field in field_results])
+
+    def field(self, field: NestedField, field_result: Callable[[], IcebergType]) -> IcebergType:
+        return NestedField(
+            field_id=self._get_and_increment(), name=field.name, field_type=field_result(), required=field.required, doc=field.doc

Review Comment:
   This is going to visit children before visiting the next field. If you're trying to match the behavior of assignment in Java, you'd need to [increment the counter for each field and then visit children](https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/types/AssignFreshIds.java#L80-L84).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954322329


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +640,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field = schema.find_field(field_id)
+        if original_field is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field.name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(SchemaVisitor[IcebergType]):

Review Comment:
   Little effort and I think we need it anyway at some point. It is also nice to stick to the Java impl on this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954322511


##########
python/pyiceberg/table/partitioning.py:
##########
@@ -157,3 +159,20 @@ def compatible_with(self, other: "PartitionSpec") -> bool:
 
 
 UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
+
+
+def assign_fresh_partition_spec_ids(spec: PartitionSpec, schema: Schema) -> PartitionSpec:
+    partition_fields = []
+    for pos, field in enumerate(spec.fields):
+        schema_field = schema.find_field(field.name)

Review Comment:
   Great catch! 👍🏻 



##########
python/pyiceberg/table/sorting.py:
##########
@@ -135,3 +140,31 @@ def __str__(self) -> str:
 
 UNSORTED_SORT_ORDER_ID = 0
 UNSORTED_SORT_ORDER = SortOrder(order_id=UNSORTED_SORT_ORDER_ID)
+INITIAL_SORT_ORDER_ID = 1
+
+
+def assign_fresh_sort_order_ids(sort_order: SortOrder, old_schema: Schema, fresh_schema: Schema) -> SortOrder:
+    if sort_order.is_unsorted:
+        return UNSORTED_SORT_ORDER
+
+    fresh_fields = []
+    for field in sort_order.fields:
+        original_field = old_schema.find_field(field.source_id)
+        if original_field is None:
+            raise ValueError(f"Could not find in original schema: {field}")
+        fresh_field = fresh_schema.find_field(original_field.name)

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko merged pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko merged PR #5627:
URL: https://github.com/apache/iceberg/pull/5627


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r956788189


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field_name = schema.find_column_name(field_id)
+        if original_field_name is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field_name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field_name}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
+    """Traverses the schema and assigns monotonically increasing ids"""
+
+    counter: itertools.count
+
+    def __init__(self, start: int = 1) -> None:
+        self.counter = itertools.count(start)
+
+    def _get_and_increment(self) -> int:
+        return next(self.counter)
+
+    def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema:
+        return Schema(*struct_result().fields, identifier_field_ids=schema.identifier_field_ids)
+
+    def struct(self, struct: StructType, field_results: List[Callable[[], IcebergType]]) -> StructType:
+        return StructType(*[field() for field in field_results])
+
+    def field(self, field: NestedField, field_result: Callable[[], IcebergType]) -> IcebergType:
+        return NestedField(
+            field_id=self._get_and_increment(), name=field.name, field_type=field_result(), required=field.required, doc=field.doc
+        )
+
+    def list(self, list_type: ListType, element_result: Callable[[], IcebergType]) -> ListType:
+        return ListType(
+            element_id=self._get_and_increment(), element=element_result(), element_required=list_type.element_required
+        )
+
+    def map(self, map_type: MapType, key_result: Callable[[], IcebergType], value_result: Callable[[], IcebergType]) -> MapType:
+        return MapType(
+            key_id=self._get_and_increment(),
+            key_type=key_result(),
+            value_id=self._get_and_increment(),
+            value_type=value_result(),

Review Comment:
   Similar to above, you may need to assign the key and value IDs, then get key and value results.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r953720413


##########
python/pyiceberg/schema.py:
##########
@@ -149,7 +151,7 @@ def find_field(self, name_or_id: Union[str, int], case_sensitive: bool = True) -
         else:
             field_id = self._lazy_name_to_id_lower.get(name_or_id.lower())
 
-        if not field_id:

Review Comment:
   This was actually a bug 🐛  This would trigger on field-id 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r956787765


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +724,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = pre_order_visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field_name = schema.find_column_name(field_id)
+        if original_field_name is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field_name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field_name}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
+    """Traverses the schema and assigns monotonically increasing ids"""
+
+    counter: itertools.count
+
+    def __init__(self, start: int = 1) -> None:
+        self.counter = itertools.count(start)
+
+    def _get_and_increment(self) -> int:
+        return next(self.counter)
+
+    def schema(self, schema: Schema, struct_result: Callable[[], StructType]) -> Schema:
+        return Schema(*struct_result().fields, identifier_field_ids=schema.identifier_field_ids)

Review Comment:
   Shouldn't this re-map the identifier field IDs since it is returning a new schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r956788380


##########
python/pyiceberg/table/partitioning.py:
##########
@@ -157,3 +159,20 @@ def compatible_with(self, other: "PartitionSpec") -> bool:
 
 
 UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
+
+
+def assign_fresh_partition_spec_ids(spec: PartitionSpec, schema: Schema) -> PartitionSpec:
+    partition_fields = []
+    for pos, field in enumerate(spec.fields):
+        schema_field = schema.find_field(field.name)

Review Comment:
   @Fokko, looks like this hasn't been fixed yet, so I'm reopening the thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r953720413


##########
python/pyiceberg/schema.py:
##########
@@ -149,7 +151,7 @@ def find_field(self, name_or_id: Union[str, int], case_sensitive: bool = True) -
         else:
             field_id = self._lazy_name_to_id_lower.get(name_or_id.lower())
 
-        if not field_id:

Review Comment:
   This was actually a bug 🐛 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r958866429


##########
python/pyiceberg/table/partitioning.py:
##########
@@ -157,3 +159,20 @@ def compatible_with(self, other: "PartitionSpec") -> bool:
 
 
 UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
+
+
+def assign_fresh_partition_spec_ids(spec: PartitionSpec, schema: Schema) -> PartitionSpec:
+    partition_fields = []
+    for pos, field in enumerate(spec.fields):
+        schema_field = schema.find_field(field.name)

Review Comment:
   Sorry, that slipped through somehow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r958853761


##########
python/pyiceberg/table/metadata.py:
##########
@@ -327,24 +334,43 @@ def check_sort_orders(cls, values: Dict[str, Any]):
     based on the spec. Implementations must throw an exception if a table’s
     version is higher than the supported version."""
 
-    table_uuid: UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
-    """A UUID that identifies the table, generated when the table is created.
-    Implementations must throw an exception if a table’s UUID does not match
-    the expected UUID after refreshing metadata."""
-
     last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
     """The table’s highest assigned sequence number, a monotonically
     increasing long that tracks the order of snapshots in a table."""
 
 
-class TableMetadata:
+TableMetadata = Union[TableMetadataV1, TableMetadataV2]
+
+
+def new_table_metadata(
+    schema: Schema, partition_spec: PartitionSpec, sort_order: SortOrder, location: str, properties: Properties = EMPTY_DICT
+) -> TableMetadata:
+    fresh_schema = assign_fresh_schema_ids(schema)
+    fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, fresh_schema)
+    fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)

Review Comment:
   Only when you create TableMetadata out of it (when creating a new table). And it resets if it isn't `1`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954249202


##########
python/pyiceberg/table/metadata.py:
##########
@@ -327,24 +333,45 @@ def check_sort_orders(cls, values: Dict[str, Any]):
     based on the spec. Implementations must throw an exception if a table’s
     version is higher than the supported version."""
 
-    table_uuid: UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
-    """A UUID that identifies the table, generated when the table is created.
-    Implementations must throw an exception if a table’s UUID does not match
-    the expected UUID after refreshing metadata."""
-
     last_sequence_number: int = Field(alias="last-sequence-number", default=INITIAL_SEQUENCE_NUMBER)
     """The table’s highest assigned sequence number, a monotonically
     increasing long that tracks the order of snapshots in a table."""
 
 
-class TableMetadata:
+TableMetadata = Union[TableMetadataV1, TableMetadataV2]
+
+
+def new_table_metadata(
+    schema: Schema, partition_spec: PartitionSpec, sort_order: SortOrder, location: str, properties: Properties = EMPTY_DICT
+) -> TableMetadata:
+    fresh_schema = assign_fresh_schema_ids(schema)
+    fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, fresh_schema)
+    fresh_sort_order = assign_fresh_sort_order_ids(sort_order, schema, fresh_schema)
+
+    return TableMetadataV2(
+        location=location,
+        schemas=[fresh_schema],
+        current_schema_id=fresh_schema.schema_id,
+        partition_specs=[fresh_partition_spec],
+        default_spec_id=fresh_partition_spec.spec_id,
+        sort_orders=[fresh_sort_order],
+        default_sort_order_id=fresh_sort_order.order_id,
+        properties=properties,
+        last_column_id=fresh_schema.highest_field_id,
+        last_partition_id=max(field.field_id for field in fresh_partition_spec.fields)

Review Comment:
   Great idea!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954248007


##########
python/pyiceberg/table/metadata.py:
##########
@@ -103,12 +109,12 @@ def construct_refs(cls, data: Dict[str, Any]):
     """The table’s base location. This is used by writers to determine where
     to store data files, manifest files, and table metadata files."""
 
-    table_uuid: Optional[UUID] = Field(alias="table-uuid", default_factory=uuid4)
+    table_uuid: uuid.UUID = Field(alias="table-uuid", default_factory=uuid.uuid4)
     """A UUID that identifies the table, generated when the table is created.
     Implementations must throw an exception if a table’s UUID does not match
     the expected UUID after refreshing metadata."""
 
-    last_updated_ms: int = Field(alias="last-updated-ms")
+    last_updated_ms: int = Field(alias="last-updated-ms", default_factory=lambda: int(time.time() * 1000))

Review Comment:
   I keep forgetting about that helper class, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954176689


##########
python/pyiceberg/table/partitioning.py:
##########
@@ -157,3 +159,20 @@ def compatible_with(self, other: "PartitionSpec") -> bool:
 
 
 UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
+
+
+def assign_fresh_partition_spec_ids(spec: PartitionSpec, schema: Schema) -> PartitionSpec:
+    partition_fields = []
+    for pos, field in enumerate(spec.fields):
+        schema_field = schema.find_field(field.name)

Review Comment:
   This is the partition field name, not a schema field name. The schema field must be looked up by `source_id`. This method needs both the original schema and the fresh schema. The original schema is used to get field names and then the fresh schema is used to look up the new source ID.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954176689


##########
python/pyiceberg/table/partitioning.py:
##########
@@ -157,3 +159,20 @@ def compatible_with(self, other: "PartitionSpec") -> bool:
 
 
 UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
+
+
+def assign_fresh_partition_spec_ids(spec: PartitionSpec, schema: Schema) -> PartitionSpec:
+    partition_fields = []
+    for pos, field in enumerate(spec.fields):
+        schema_field = schema.find_field(field.name)

Review Comment:
   This is the partition field name, not a schema field name. The schema field must be looked up by `source_id`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #5627: Python: Reassign schema/partition-spec/sort-order ids

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #5627:
URL: https://github.com/apache/iceberg/pull/5627#discussion_r954242048


##########
python/pyiceberg/schema.py:
##########
@@ -638,3 +640,61 @@ def map(self, map_type: MapType, key_result: int, value_result: int) -> int:
 
     def primitive(self, primitive: PrimitiveType) -> int:
         return 0
+
+
+def assign_fresh_schema_ids(schema: Schema) -> Schema:
+    """Traverses the schema, and sets new IDs"""
+    schema_struct = visit(schema.as_struct(), _SetFreshIDs())
+
+    fresh_identifier_field_ids = []
+    new_schema = Schema(*schema_struct.fields)
+    for field_id in schema.identifier_field_ids:
+        original_field = schema.find_field(field_id)
+        if original_field is None:
+            raise ValueError(f"Could not find field: {field_id}")
+        fresh_field = new_schema.find_field(original_field.name)
+        if fresh_field is None:
+            raise ValueError(f"Could not lookup field in new schema: {original_field}")
+        fresh_identifier_field_ids.append(fresh_field.field_id)
+
+    return new_schema.copy(update={"identifier_field_ids": fresh_identifier_field_ids})
+
+
+class _SetFreshIDs(SchemaVisitor[IcebergType]):
+    """Traverses the schema to get the highest field-id"""
+
+    counter: int
+
+    def __init__(self) -> None:
+        self.counter = 0

Review Comment:
   Reminds me of university. Matlab starts counting at 1 instead of 0, got it 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org