You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/10 20:38:08 UTC

[GitHub] [iceberg] Fokko commented on a diff in pull request #4717: Python: Add PartitionSpec

Fokko commented on code in PR #4717:
URL: https://github.com/apache/iceberg/pull/4717#discussion_r894871233


##########
python/src/iceberg/table/partitioning.py:
##########
@@ -29,38 +36,81 @@ class PartitionField:
         name(str): The name of this partition field
     """
 
-    def __init__(self, source_id: int, field_id: int, transform: Transform, name: str):
-        self._source_id = source_id
-        self._field_id = field_id
-        self._transform = transform
-        self._name = name
+    source_id: int
+    field_id: int
+    transform: Transform
+    name: str
+
+    def __str__(self):
+        return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"
+
 
-    @property
-    def source_id(self) -> int:
-        return self._source_id
+@dataclass(eq=False, frozen=True)
+class PartitionSpec:
+    """
+    PartitionSpec captures the transformation from table data to partition values
 
-    @property
-    def field_id(self) -> int:
-        return self._field_id
+    Attributes:
+        schema(Schema): the schema of data table
+        spec_id(int): any change to PartitionSpec will produce a new specId
+        fields(List[PartitionField): list of partition fields to produce partition values
+        last_assigned_field_id(int): auto-increment partition field id starting from PARTITION_DATA_ID_START
+    """
 
-    @property
-    def name(self) -> str:
-        return self._name
+    schema: Schema
+    spec_id: int
+    fields: Tuple[PartitionField, ...]
+    last_assigned_field_id: int
+    source_id_to_fields_map: Dict[int, List[PartitionField]] = field(init=False, repr=False)
 
-    @property
-    def transform(self) -> Transform:
-        return self._transform
+    def __post_init__(self):
+        source_id_to_fields_map = dict()
+        for partition_field in self.fields:
+            source_column = self.schema.find_column_name(partition_field.source_id)
+            if not source_column:
+                raise ValueError(f"Cannot find source column: {partition_field.source_id}")
+            existing = source_id_to_fields_map.get(partition_field.source_id, [])
+            existing.append(partition_field)
+            source_id_to_fields_map[partition_field.source_id] = existing
+        object.__setattr__(self, "source_id_to_fields_map", source_id_to_fields_map)
 
     def __eq__(self, other):
-        return (
-            self.field_id == other.field_id
-            and self.source_id == other.source_id
-            and self.name == other.name
-            and self.transform == other.transform
-        )
+        """
+        Produce a boolean to return True if two objects are considered equal
+
+        Note:
+            Equality of PartitionSpec is determined by spec_id and partition fields only
+        """
+        if not isinstance(other, PartitionSpec):
+            return False
+        return self.spec_id == other.spec_id and self.fields == other.fields
 
     def __str__(self):
-        return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"
+        """
+        Produce a human-readable string representation of PartitionSpec
 
-    def __repr__(self):
-        return f"PartitionField(field_id={self.field_id}, name={self.name}, transform={repr(self.transform)}, source_id={self.source_id})"
+        Note:
+            Only include list of partition fields in the PartitionSpec's string representation
+        """
+        result_str = "["
+        if self.fields:
+            result_str += "\n  " + "\n  ".join([str(field) for field in self.fields]) + "\n"
+        result_str += "]"
+        return result_str
+
+    def is_unpartitioned(self) -> bool:
+        return len(self.fields) < 1
+
+    def fields_by_source_id(self, field_id: int) -> List[PartitionField]:
+        return self.source_id_to_fields_map[field_id]
+
+    def compatible_with(self, other: "PartitionSpec") -> bool:

Review Comment:
   We should also assert the number of fields. The following test is passing:
   ```python
   def test_partition_compatible_with(table_schema_simple: Schema):
       bucket_transform = bucket(IntegerType(), 4)
       field1 = PartitionField(3, 100, bucket_transform, "id")
       field2 = PartitionField(3, 102, bucket_transform, "id")
       lhs = PartitionSpec(table_schema_simple, 0, (field1,), 1001)
       rhs = PartitionSpec(table_schema_simple, 0, (field1, field2), 1001)
       assert lhs.compatible_with(rhs)
   ```
   <img width="1624" alt="image" src="https://user-images.githubusercontent.com/1134248/173146156-8e4f9459-92f7-4cc2-ada9-a1a11582b402.png">
   
   We also do this on the Java side: https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L211
   
   The Python zip will only zip the number of fields that are in the shortest list.



##########
python/src/iceberg/table/partitioning.py:
##########
@@ -29,38 +36,81 @@ class PartitionField:
         name(str): The name of this partition field
     """
 
-    def __init__(self, source_id: int, field_id: int, transform: Transform, name: str):
-        self._source_id = source_id
-        self._field_id = field_id
-        self._transform = transform
-        self._name = name
+    source_id: int
+    field_id: int
+    transform: Transform
+    name: str
+
+    def __str__(self):
+        return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"
+
 
-    @property
-    def source_id(self) -> int:
-        return self._source_id
+@dataclass(eq=False, frozen=True)
+class PartitionSpec:
+    """
+    PartitionSpec captures the transformation from table data to partition values
 
-    @property
-    def field_id(self) -> int:
-        return self._field_id
+    Attributes:
+        schema(Schema): the schema of data table
+        spec_id(int): any change to PartitionSpec will produce a new specId
+        fields(List[PartitionField): list of partition fields to produce partition values
+        last_assigned_field_id(int): auto-increment partition field id starting from PARTITION_DATA_ID_START
+    """
 
-    @property
-    def name(self) -> str:
-        return self._name
+    schema: Schema
+    spec_id: int
+    fields: Tuple[PartitionField, ...]
+    last_assigned_field_id: int
+    source_id_to_fields_map: Dict[int, List[PartitionField]] = field(init=False, repr=False)
 
-    @property
-    def transform(self) -> Transform:
-        return self._transform
+    def __post_init__(self):
+        source_id_to_fields_map = dict()
+        for partition_field in self.fields:
+            source_column = self.schema.find_column_name(partition_field.source_id)
+            if not source_column:
+                raise ValueError(f"Cannot find source column: {partition_field.source_id}")
+            existing = source_id_to_fields_map.get(partition_field.source_id, [])
+            existing.append(partition_field)
+            source_id_to_fields_map[partition_field.source_id] = existing
+        object.__setattr__(self, "source_id_to_fields_map", source_id_to_fields_map)
 
     def __eq__(self, other):
-        return (
-            self.field_id == other.field_id
-            and self.source_id == other.source_id
-            and self.name == other.name
-            and self.transform == other.transform
-        )
+        """
+        Produce a boolean to return True if two objects are considered equal
+
+        Note:
+            Equality of PartitionSpec is determined by spec_id and partition fields only
+        """
+        if not isinstance(other, PartitionSpec):
+            return False
+        return self.spec_id == other.spec_id and self.fields == other.fields
 
     def __str__(self):
-        return f"{self.field_id}: {self.name}: {self.transform}({self.source_id})"
+        """
+        Produce a human-readable string representation of PartitionSpec
 
-    def __repr__(self):
-        return f"PartitionField(field_id={self.field_id}, name={self.name}, transform={repr(self.transform)}, source_id={self.source_id})"
+        Note:
+            Only include list of partition fields in the PartitionSpec's string representation
+        """
+        result_str = "["
+        if self.fields:
+            result_str += "\n  " + "\n  ".join([str(field) for field in self.fields]) + "\n"
+        result_str += "]"
+        return result_str
+
+    def is_unpartitioned(self) -> bool:
+        return len(self.fields) < 1

Review Comment:
   supernit: We could shorten this to:
   ```suggestion
           return not self.fields
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org