You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/10/05 20:01:06 UTC

[GitHub] [iceberg] Fokko commented on a diff in pull request #5845: Python: Manifest evaluator

Fokko commented on code in PR #5845:
URL: https://github.com/apache/iceberg/pull/5845#discussion_r985498605


##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) -> T:
     return visitor.visit_less_than_or_equal(term=expr.term, literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check separately.
+        # In case bounds don't include NaN value, containsNaN needs to be checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want

Review Comment:
   It a good that I've added that comment. I don't really like setting a class variable over there. I've split it out into a separate class.



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) -> T:
     return visitor.visit_less_than_or_equal(term=expr.term, literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check separately.
+        # In case bounds don't include NaN value, containsNaN needs to be checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
+
+        if all([lower < val.value for val in literals]):
+            return ROWS_CANNOT_MATCH
+
+        if field.upper_bound is not None:
+            upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)
+            if all([upper > val.value for val in literals]):
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        # because the bounds are not necessarily a min or max value, this cannot be answered using
+        # them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a value in col.
+        return ROWS_MIGHT_MATCH
+
+    def visit_is_nan(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if self.partition_fields[pos].contains_nan is False:
+            return ROWS_CANNOT_MATCH
+
+        if all_values_are_null(field, term.ref().field.field_type):
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_nan(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.contains_nan is True and field.contains_null is False and field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_is_null(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+
+        if self.partition_fields[pos].contains_null is False:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_null(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+
+        if all_values_are_null(self.partition_fields[pos], term.ref().field.field_type):
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            # values are all null and literal cannot contain null
+            return ROWS_CANNOT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
+
+        if lower > literal.value:
+            return ROWS_CANNOT_MATCH
+
+        upper = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
+
+        if literal.value > upper:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        # because the bounds are not necessarily a min or max value, this cannot be answered using
+        # them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
+        return ROWS_MIGHT_MATCH
+
+    def visit_greater_than_or_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.upper_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)
+
+        if literal.value > upper:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_greater_than(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.upper_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)
+
+        if literal.value >= upper:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_less_than(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
+
+        if literal.value <= lower:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_less_than_or_equal(self, term: BoundTerm, literal: Literal[Any]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
+
+        if literal.value < lower:
+            return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_true(self) -> bool:
+        return True

Review Comment:
   Nice, thanks!



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) -> T:
     return visitor.visit_less_than_or_equal(term=expr.term, literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check separately.
+        # In case bounds don't include NaN value, containsNaN needs to be checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
+
+        if all([lower < val.value for val in literals]):
+            return ROWS_CANNOT_MATCH
+
+        if field.upper_bound is not None:
+            upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)
+            if all([upper > val.value for val in literals]):
+                return ROWS_CANNOT_MATCH
+
+        return ROWS_MIGHT_MATCH
+
+    def visit_not_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        # because the bounds are not necessarily a min or max value, this cannot be answered using
+        # them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a value in col.
+        return ROWS_MIGHT_MATCH
+
+    def visit_is_nan(self, term: BoundTerm) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if self.partition_fields[pos].contains_nan is False:

Review Comment:
   It should, thanks!



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) -> T:
     return visitor.visit_less_than_or_equal(term=expr.term, literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: IcebergType) -> bool:

Review Comment:
   Good call



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) -> T:
     return visitor.visit_less_than_or_equal(term=expr.term, literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check separately.
+        # In case bounds don't include NaN value, containsNaN needs to be checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
+
+        if all([lower < val.value for val in literals]):

Review Comment:
   Ahh, great catch! It is flipped indeed, thanks! A test has been added.



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) -> T:
     return visitor.visit_less_than_or_equal(term=expr.term, literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check separately.
+        # In case bounds don't include NaN value, containsNaN needs to be checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
+
+        if all([lower < val.value for val in literals]):

Review Comment:
   I implemented this using the `all` method since it will stop the loop early when a `False` has been encountered.



##########
python/pyiceberg/expressions/base.py:
##########
@@ -835,3 +843,204 @@ def _(expr: BoundLessThan, visitor: BoundBooleanExpressionVisitor[T]) -> T:
 @visit_bound_predicate.register(BoundLessThanOrEqual)
 def _(expr: BoundLessThanOrEqual, visitor: BoundBooleanExpressionVisitor[T]) -> T:
     return visitor.visit_less_than_or_equal(term=expr.term, literal=expr.literal)
+
+
+ROWS_MIGHT_MATCH = True
+ROWS_CANNOT_MATCH = False
+IN_PREDICATE_LIMIT = 200
+
+
+def all_values_are_null(partition_field: PartitionFieldSummary, field_type: IcebergType) -> bool:
+    # containsNull encodes whether at least one partition value is null,
+    # lowerBound is null if all partition values are null
+    all_null = partition_field.contains_null is True and partition_field.lower_bound is None
+
+    if all_null and (field_type is DoubleType or field_type is FloatType):
+        # floating point types may include NaN values, which we check separately.
+        # In case bounds don't include NaN value, containsNaN needs to be checked against.
+        return partition_field.contains_nan is False
+
+    return all_null
+
+
+def _from_byte_buffer(field_type: IcebergType, val: bytes):
+    if not isinstance(field_type, PrimitiveType):
+        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
+    return from_bytes(field_type, val)
+
+
+class ManifestEvaluator(BoundBooleanExpressionVisitor[bool]):
+    partition_filter: BooleanExpression
+    partition_fields: list[PartitionFieldSummary]
+
+    def __init__(self, partition_filter: BooleanExpression):
+        self.partition_filter = partition_filter
+
+    def eval(self, manifest: ManifestFile) -> bool:
+        if partitions := manifest.partitions:
+            # this is not what we want
+            self.partition_fields = partitions
+            return visit(self.partition_filter, self)
+
+        # No partition information
+        return ROWS_MIGHT_MATCH
+
+    def visit_in(self, term: BoundTerm, literals: set[Literal[Any]]) -> bool:
+        pos = term.ref().accessor.position
+        field = self.partition_fields[pos]
+
+        if field.lower_bound is None:
+            return ROWS_CANNOT_MATCH
+
+        if len(literals) > IN_PREDICATE_LIMIT:
+            return ROWS_MIGHT_MATCH
+
+        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
+
+        if all([lower < val.value for val in literals]):
+            return ROWS_CANNOT_MATCH
+
+        if field.upper_bound is not None:
+            upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)
+            if all([upper > val.value for val in literals]):

Review Comment:
   At least it is consistently reversed, thanks! Updated and added a test 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org