You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/23 17:42:56 UTC

[GitHub] [iceberg] Fokko opened a new pull request, #6258: Python: Implement PyArrow row level filtering

Fokko opened a new pull request, #6258:
URL: https://github.com/apache/iceberg/pull/6258

   This converts an Iceberg conversion to a PyArrow expression and filters the table before materializing it.
   
   This should also make things more efficient:
   
   https://arrow.apache.org/docs/python/generated/pyarrow.dataset.dataset.html#pyarrow.dataset.dataset
   
   - Optimized reading with predicate pushdown (filtering rows), projection (selecting columns), parallel reading or fine-grained managing of tasks.
   
   ```python
   In [3]: from pyiceberg.catalog import load_catalog
      ...:
      ...: catalog = load_catalog('local')
      ...:
      ...: table = catalog.load_table(('nyc', 'taxis'))
      ...:
      ...: from pyiceberg.expressions import EqualTo
      ...:
      ...: table.scan().select("VendorID", "tpep_pickup_datetime").filter_rows(EqualTo("VendorID", 1)).to_arrow()
      ...:
   Out[3]:
   pyarrow.Table
   VendorID: int64
   tpep_pickup_datetime: timestamp[us, tz=+00:00]
   ----
   VendorID: [[1,1,1,1,1,...,1,1,1,1,1],[1,1,1,1,1,...,1,1,1,1,1],...,[1,1,1,1,1,...,1,1,1,1,1],[1,1,1,1,1,...,1,1,1,1,1]]
   tpep_pickup_datetime: [[2021-04-01 00:00:18.000000,2021-04-01 00:42:37.000000,2021-04-01 00:57:56.000000,2021-04-01 00:01:58.000000,2021-04-01 00:27:53.000000,...,2021-04-02
   22:10:18.000000,2021-04-02 22:41:52.000000,2021-04-02 22:57:26.000000,2021-04-02 22:16:03.000000,2021-04-02 22:34:19.000000],[2021-04-02 22:06:46.000000,2021-04-02
   22:47:17.000000,2021-04-02 22:02:10.000000,2021-04-02 22:02:31.000000,2021-04-02 22:15:02.000000,...,2021-04-05 13:15:36.000000,2021-04-05 13:30:38.000000,2021-04-05
   13:19:12.000000,2021-04-05 13:34:42.000000,2021-04-05 13:41:51.000000],...,[2021-04-30 08:21:52.000000,2021-04-30 08:54:38.000000,2021-04-30 08:23:12.000000,2021-04-30
   08:14:43.000000,2021-04-30 08:30:01.000000,...,2021-04-14 09:44:14.000000,2021-04-14 09:45:33.000000,2021-04-14 09:17:11.000000,2021-04-14 09:21:46.000000,2021-04-14
   09:24:27.000000],[2021-04-14 09:57:34.000000,2021-04-14 09:57:11.000000,2021-04-14 09:17:21.000000,2021-04-14 09:27:45.000000,2021-04-14 09:16:00.000000,...,2021-04-30
   23:33:05.000000,2021-04-30 23:33:17.000000,2021-04-30 23:06:50.000000,2021-04-30 23:20:32.000000,2021-04-30 23:05:21.000000]]
   ```
   
   ```python
   In [4]: from pyiceberg.catalog import load_catalog
      ...:
      ...: catalog = load_catalog('local')
      ...:
      ...: table = catalog.load_table(('nyc', 'taxis'))
      ...:
      ...: table.scan().select("VendorID", "tpep_pickup_datetime").to_arrow()
      ...:
      ...:
   Out[4]:
   pyarrow.Table
   VendorID: int64
   tpep_pickup_datetime: timestamp[us, tz=+00:00]
   ----
   VendorID: [[1,1,1,1,2,...,1,1,2,2,2],[2,2,2,1,1,...,2,2,2,2,1],...,[1,1,1,2,2,...,1,6,1,2,2],[2,2,2,1,2,...,2,1,2,2,1]]
   tpep_pickup_datetime: [[2021-04-01 00:00:18.000000,2021-04-01 00:42:37.000000,2021-04-01 00:57:56.000000,2021-04-01 00:01:58.000000,2021-04-01 00:24:55.000000,...,2021-04-02
   22:16:03.000000,2021-04-02 22:34:19.000000,2021-04-02 22:12:43.000000,2021-04-02 22:41:39.000000,2021-04-02 22:09:28.000000],[2021-04-02 22:20:04.000000,2021-04-02
   22:34:37.000000,2021-04-02 22:54:15.000000,2021-04-02 22:06:46.000000,2021-04-02 22:47:17.000000,...,2021-04-05 13:25:45.000000,2021-04-05 13:35:34.000000,2021-04-05
   13:18:25.000000,2021-04-05 13:37:43.000000,2021-04-05 13:41:51.000000],...,[2021-04-30 08:21:52.000000,2021-04-30 08:54:38.000000,2021-04-30 08:23:12.000000,2021-04-30
   07:44:56.000000,2021-04-30 08:08:37.000000,...,2021-04-14 09:21:46.000000,2021-04-14 09:04:55.000000,2021-04-14 09:24:27.000000,2021-04-14 09:31:27.000000,2021-04-14
   09:36:01.000000],[2021-04-14 09:04:52.000000,2021-04-14 09:04:00.000000,2021-04-14 09:52:59.000000,2021-04-14 09:57:34.000000,2021-04-14 09:29:00.000000,...,2021-04-30
   23:39:00.000000,2021-04-30 23:20:32.000000,2021-04-30 23:33:00.000000,2021-04-30 23:31:38.000000,2021-04-30 23:05:21.000000]] ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033399213


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +400,95 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BooleanExpressionVisitor[pc.Expression]):
+    def visit_true(self) -> pc.Expression:
+        return pc.scalar(True)
+
+    def visit_false(self) -> pc.Expression:
+        return pc.scalar(False)
+
+    def visit_not(self, child_result: pc.Expression) -> pc.Expression:
+        return ~child_result
+
+    def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result & right_result
+
+    def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result | right_result
+
+    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> pc.Expression:
+        raise ValueError("Please bind the expression first")
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> pc.Expression:
+        return _iceberg_to_pyarrow_predicate(predicate)
+
+
+@singledispatch
+def _iceberg_to_pyarrow_predicate(expr: BoundPredicate[str]) -> pc.Expression:
+    raise ValueError(f"Unknown expression: {expr}")
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNull)
+def _(bound: BoundIsNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(False)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotNull)
+def _(bound: BoundNotNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_valid()
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNaN)
+def _(bound: BoundIsNaN[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(True)

Review Comment:
   Nice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032838588


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +400,95 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BooleanExpressionVisitor[pc.Expression]):
+    def visit_true(self) -> pc.Expression:
+        return pc.scalar(True)
+
+    def visit_false(self) -> pc.Expression:
+        return pc.scalar(False)
+
+    def visit_not(self, child_result: pc.Expression) -> pc.Expression:
+        return ~child_result
+
+    def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result & right_result
+
+    def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result | right_result
+
+    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> pc.Expression:
+        raise ValueError("Please bind the expression first")
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> pc.Expression:
+        return _iceberg_to_pyarrow_predicate(predicate)
+
+
+@singledispatch
+def _iceberg_to_pyarrow_predicate(expr: BoundPredicate[str]) -> pc.Expression:
+    raise ValueError(f"Unknown expression: {expr}")
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNull)
+def _(bound: BoundIsNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(False)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotNull)
+def _(bound: BoundNotNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_valid()
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNaN)
+def _(bound: BoundIsNaN[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(True)

Review Comment:
   I found [more specific docs for is_null](https://arrow.apache.org/docs/python/generated/pyarrow.compute.is_null.html):
   
   > True may also be emitted for NaN values by setting the nan_is_null flag
   
   I think this needs to be more complex to handle NaN only:
   
   ```python
   @_iceberg_to_pyarrow_predicate.register(BoundIsNaN)
   def _(bound: BoundIsNaN[str]) -> pc.Expression:
       ref = pc.field(bound.term.ref().field.name)
       return ref.is_null(nan_is_null=True) & ref.is_valid()
   ```
   
   That way this will match NaN values, but not invalid (null) values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033516566


##########
python/pyiceberg/expressions/visitors.py:
##########
@@ -185,7 +185,7 @@ def _(obj: Or, visitor: BooleanExpressionVisitor[T]) -> T:
     return visitor.visit_or(left_result=left_result, right_result=right_result)
 
 
-def bind(schema: Schema, expression: BooleanExpression, case_sensitive: bool) -> BooleanExpression:
+def bind(schema: Schema, expression: BooleanExpression, case_sensitive: bool = True) -> BooleanExpression:

Review Comment:
   I also made this one mandatory again, so we don't accidentally forget to set it ..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032837682


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +400,95 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BooleanExpressionVisitor[pc.Expression]):
+    def visit_true(self) -> pc.Expression:
+        return pc.scalar(True)
+
+    def visit_false(self) -> pc.Expression:
+        return pc.scalar(False)
+
+    def visit_not(self, child_result: pc.Expression) -> pc.Expression:
+        return ~child_result
+
+    def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result & right_result
+
+    def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result | right_result
+
+    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> pc.Expression:
+        raise ValueError("Please bind the expression first")
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> pc.Expression:
+        return _iceberg_to_pyarrow_predicate(predicate)
+
+
+@singledispatch

Review Comment:
   Why use `@singledispatch` instead of `BoundBooleanExpressionVisitor` that does this for you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033442764


##########
python/tests/io/test_pyarrow.py:
##########
@@ -411,3 +434,140 @@ def test_list_type_to_pyarrow():
         element_required=True,
     )
     assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.list_(pa.int32())
+
+
+@pytest.fixture
+def bound_reference(table_schema_simple: Schema) -> BoundReference[str]:
+    return BoundReference(table_schema_simple.find_field(1), table_schema_simple.accessor_for_field(1))
+
+
+@pytest.fixture
+def bound_double_reference(table_schema_simple: Schema) -> BoundReference[float]:
+    schema = Schema(
+        NestedField(field_id=1, name="foo", field_type=DoubleType(), required=False),
+        schema_id=1,
+        identifier_field_ids=[2],
+    )

Review Comment:
   Usually I split out things into fixtures if we use it at multiple places



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1034708093


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +385,62 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]):
+    def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc.Expression:
+        return pc.field(term.ref().field.name).isin(literals)
+
+    def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc.Expression:
+        return ~pc.field(term.ref().field.name).isin(literals)
+
+    def visit_is_nan(self, term: BoundTerm[pc.Expression]) -> pc.Expression:
+        ref = pc.field(term.ref().field.name)
+        return ref.is_null(nan_is_null=True) & ref.is_valid()
+
+    def visit_not_nan(self, term: BoundTerm[pc.Expression]) -> pc.Expression:
+        ref = pc.field(term.ref().field.name)
+        return ~(ref.is_null(nan_is_null=True) & ref.is_valid())
+
+    def visit_is_null(self, term: BoundTerm[pc.Expression]) -> pc.Expression:

Review Comment:
   It doesn't affect the code but looks messy. Fixed in: https://github.com/apache/iceberg/pull/6308



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033936261


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +385,61 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]):
+    def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc.Expression:
+        return pc.field(term.ref().field.name).isin(literals)
+
+    def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[pc.Expression]) -> pc.Expression:
+        return ~pc.field(term.ref().field.name).isin(literals)
+
+    def visit_is_nan(self, term: BoundTerm[pc.Expression]) -> pc.Expression:
+        ref = pc.field(term.ref().field.name)
+        return ref.is_null(nan_is_null=True) & ref.is_valid()
+
+    def visit_not_nan(self, term: BoundTerm[pc.Expression]) -> pc.Expression:
+        return ~pc.field(term.ref().field.name).is_null(nan_is_null=True)

Review Comment:
   Of course 🤦🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1034007197


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +385,62 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]):
+    def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc.Expression:
+        return pc.field(term.ref().field.name).isin(literals)
+
+    def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc.Expression:
+        return ~pc.field(term.ref().field.name).isin(literals)
+
+    def visit_is_nan(self, term: BoundTerm[pc.Expression]) -> pc.Expression:
+        ref = pc.field(term.ref().field.name)
+        return ref.is_null(nan_is_null=True) & ref.is_valid()
+
+    def visit_not_nan(self, term: BoundTerm[pc.Expression]) -> pc.Expression:
+        ref = pc.field(term.ref().field.name)
+        return ~(ref.is_null(nan_is_null=True) & ref.is_valid())
+
+    def visit_is_null(self, term: BoundTerm[pc.Expression]) -> pc.Expression:

Review Comment:
   These bound terms also have the wrong type. I don't think that it matters for correctness, but the type of the term should be the Python type of values produced by that term, which won't be a pyarrow Expression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue merged PR #6258:
URL: https://github.com/apache/iceberg/pull/6258


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033855472


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +385,61 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]):
+    def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc.Expression:
+        return pc.field(term.ref().field.name).isin(literals)
+
+    def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[pc.Expression]) -> pc.Expression:
+        return ~pc.field(term.ref().field.name).isin(literals)
+
+    def visit_is_nan(self, term: BoundTerm[pc.Expression]) -> pc.Expression:
+        ref = pc.field(term.ref().field.name)
+        return ref.is_null(nan_is_null=True) & ref.is_valid()
+
+    def visit_not_nan(self, term: BoundTerm[pc.Expression]) -> pc.Expression:
+        return ~pc.field(term.ref().field.name).is_null(nan_is_null=True)

Review Comment:
   I think that this needs to be the negation of `visit_is_nan` to handle `null` values correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032837463


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +400,95 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BooleanExpressionVisitor[pc.Expression]):
+    def visit_true(self) -> pc.Expression:
+        return pc.scalar(True)
+
+    def visit_false(self) -> pc.Expression:
+        return pc.scalar(False)
+
+    def visit_not(self, child_result: pc.Expression) -> pc.Expression:
+        return ~child_result
+
+    def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result & right_result
+
+    def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result | right_result
+
+    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> pc.Expression:
+        raise ValueError("Please bind the expression first")
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> pc.Expression:
+        return _iceberg_to_pyarrow_predicate(predicate)
+
+
+@singledispatch
+def _iceberg_to_pyarrow_predicate(expr: BoundPredicate[str]) -> pc.Expression:
+    raise ValueError(f"Unknown expression: {expr}")
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNull)
+def _(bound: BoundIsNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(False)

Review Comment:
   Nit: `nan_is_null=False` to avoid confusing nameless boolean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032838052


##########
python/tests/io/test_pyarrow.py:
##########
@@ -411,3 +434,140 @@ def test_list_type_to_pyarrow():
         element_required=True,
     )
     assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.list_(pa.int32())
+
+
+@pytest.fixture
+def bound_reference(table_schema_simple: Schema) -> BoundReference[str]:
+    return BoundReference(table_schema_simple.find_field(1), table_schema_simple.accessor_for_field(1))
+
+
+@pytest.fixture
+def bound_double_reference(table_schema_simple: Schema) -> BoundReference[float]:

Review Comment:
   Does this need `table_schema_simple`?



##########
python/tests/io/test_pyarrow.py:
##########
@@ -411,3 +434,140 @@ def test_list_type_to_pyarrow():
         element_required=True,
     )
     assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.list_(pa.int32())
+
+
+@pytest.fixture
+def bound_reference(table_schema_simple: Schema) -> BoundReference[str]:
+    return BoundReference(table_schema_simple.find_field(1), table_schema_simple.accessor_for_field(1))
+
+
+@pytest.fixture
+def bound_double_reference(table_schema_simple: Schema) -> BoundReference[float]:
+    schema = Schema(
+        NestedField(field_id=1, name="foo", field_type=DoubleType(), required=False),
+        schema_id=1,
+        identifier_field_ids=[2],
+    )

Review Comment:
   Should we have a fixture that has a double field?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033509831


##########
python/pyiceberg/table/__init__.py:
##########
@@ -355,7 +355,23 @@ def to_arrow(self):
         if "*" not in self.selected_fields:
             columns = list(self.selected_fields)
 
-        return pq.read_table(source=locations, filesystem=fs, columns=columns)
+        pyarrow_filter = None
+        if self.row_filter is not AlwaysTrue():
+            bound_row_filter = bind(self.table.schema(), self.row_filter)
+            pyarrow_filter = expression_to_pyarrow(bound_row_filter)
+
+        from pyarrow.dataset import dataset
+
+        ds = dataset(
+            source=locations,
+            filesystem=fs,
+            # Optionally provide the Schema for the Dataset,
+            # in which case it will not be inferred from the source.
+            # https://arrow.apache.org/docs/python/generated/pyarrow.dataset.dataset.html#pyarrow.dataset.dataset
+            schema=schema_to_pyarrow(self.table.schema()),

Review Comment:
   It should be equal to or a subset of the original schema, see the example in the PR description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033509831


##########
python/pyiceberg/table/__init__.py:
##########
@@ -355,7 +355,23 @@ def to_arrow(self):
         if "*" not in self.selected_fields:
             columns = list(self.selected_fields)
 
-        return pq.read_table(source=locations, filesystem=fs, columns=columns)
+        pyarrow_filter = None
+        if self.row_filter is not AlwaysTrue():
+            bound_row_filter = bind(self.table.schema(), self.row_filter)
+            pyarrow_filter = expression_to_pyarrow(bound_row_filter)
+
+        from pyarrow.dataset import dataset
+
+        ds = dataset(
+            source=locations,
+            filesystem=fs,
+            # Optionally provide the Schema for the Dataset,
+            # in which case it will not be inferred from the source.
+            # https://arrow.apache.org/docs/python/generated/pyarrow.dataset.dataset.html#pyarrow.dataset.dataset
+            schema=schema_to_pyarrow(self.table.schema()),

Review Comment:
   It should be equal or a subset of the original schema, see the PR description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032837927


##########
python/pyiceberg/table/__init__.py:
##########
@@ -355,7 +355,23 @@ def to_arrow(self):
         if "*" not in self.selected_fields:
             columns = list(self.selected_fields)
 
-        return pq.read_table(source=locations, filesystem=fs, columns=columns)
+        pyarrow_filter = None
+        if self.row_filter is not AlwaysTrue():
+            bound_row_filter = bind(self.table.schema(), self.row_filter)
+            pyarrow_filter = expression_to_pyarrow(bound_row_filter)
+
+        from pyarrow.dataset import dataset
+
+        ds = dataset(
+            source=locations,
+            filesystem=fs,
+            # Optionally provide the Schema for the Dataset,
+            # in which case it will not be inferred from the source.
+            # https://arrow.apache.org/docs/python/generated/pyarrow.dataset.dataset.html#pyarrow.dataset.dataset
+            schema=schema_to_pyarrow(self.table.schema()),
+        )
+
+        return ds.to_table(filter=pyarrow_filter, columns=columns)

Review Comment:
   Dataset seems good to me if you can read it in chunks.
   
   We may also need to refactor this and handle files individually when we implement correct projection, so this will probably change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033853069


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -402,92 +387,58 @@ def _(_: BinaryType) -> pa.DataType:
     return pa.binary()
 
 
-class _ConvertToArrowExpression(BooleanExpressionVisitor[pc.Expression]):
-    def visit_true(self) -> pc.Expression:
-        return pc.scalar(True)
-
-    def visit_false(self) -> pc.Expression:
-        return pc.scalar(False)
-
-    def visit_not(self, child_result: pc.Expression) -> pc.Expression:
-        return ~child_result
-
-    def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
-        return left_result & right_result
-
-    def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
-        return left_result | right_result
-
-    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> pc.Expression:
-        raise ValueError("Please bind the expression first")
-
-    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> pc.Expression:
-        return _iceberg_to_pyarrow_predicate(predicate)
-
-
-@singledispatch
-def _iceberg_to_pyarrow_predicate(expr: BoundPredicate[str]) -> pc.Expression:
-    raise ValueError(f"Unknown expression: {expr}")
-
-
-@_iceberg_to_pyarrow_predicate.register(BoundIsNull)
-def _(bound: BoundIsNull[str]) -> pc.Expression:
-    return pc.field(bound.term.ref().field.name).is_null(False)
-
-
-@_iceberg_to_pyarrow_predicate.register(BoundNotNull)
-def _(bound: BoundNotNull[str]) -> pc.Expression:
-    return pc.field(bound.term.ref().field.name).is_valid()
+class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]):
+    def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc.Expression:
+        return pc.field(term.ref().field.name).isin(literals)
 
+    def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[pc.Expression]) -> pc.Expression:

Review Comment:
   The type of `literals` doesn't seem correct here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033936261


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +385,61 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BoundBooleanExpressionVisitor[pc.Expression]):
+    def visit_in(self, term: BoundTerm[pc.Expression], literals: Set[Any]) -> pc.Expression:
+        return pc.field(term.ref().field.name).isin(literals)
+
+    def visit_not_in(self, term: BoundTerm[pc.Expression], literals: Set[pc.Expression]) -> pc.Expression:
+        return ~pc.field(term.ref().field.name).isin(literals)
+
+    def visit_is_nan(self, term: BoundTerm[pc.Expression]) -> pc.Expression:
+        ref = pc.field(term.ref().field.name)
+        return ref.is_null(nan_is_null=True) & ref.is_valid()
+
+    def visit_not_nan(self, term: BoundTerm[pc.Expression]) -> pc.Expression:
+        return ~pc.field(term.ref().field.name).is_null(nan_is_null=True)

Review Comment:
   Good call!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032837561


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +400,95 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BooleanExpressionVisitor[pc.Expression]):
+    def visit_true(self) -> pc.Expression:
+        return pc.scalar(True)
+
+    def visit_false(self) -> pc.Expression:
+        return pc.scalar(False)
+
+    def visit_not(self, child_result: pc.Expression) -> pc.Expression:
+        return ~child_result
+
+    def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result & right_result
+
+    def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result | right_result
+
+    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> pc.Expression:
+        raise ValueError("Please bind the expression first")
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> pc.Expression:
+        return _iceberg_to_pyarrow_predicate(predicate)
+
+
+@singledispatch
+def _iceberg_to_pyarrow_predicate(expr: BoundPredicate[str]) -> pc.Expression:
+    raise ValueError(f"Unknown expression: {expr}")
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNull)
+def _(bound: BoundIsNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(False)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotNull)
+def _(bound: BoundNotNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_valid()
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNaN)
+def _(bound: BoundIsNaN[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(True)

Review Comment:
   Does this match both `null` and `NaN` or just `NaN` when `nan_is_null=True`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032837729


##########
python/pyiceberg/table/__init__.py:
##########
@@ -355,7 +355,23 @@ def to_arrow(self):
         if "*" not in self.selected_fields:
             columns = list(self.selected_fields)
 
-        return pq.read_table(source=locations, filesystem=fs, columns=columns)
+        pyarrow_filter = None
+        if self.row_filter is not AlwaysTrue():
+            bound_row_filter = bind(self.table.schema(), self.row_filter)

Review Comment:
   Case sensitivity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033851034


##########
python/pyiceberg/table/__init__.py:
##########
@@ -355,7 +355,23 @@ def to_arrow(self):
         if "*" not in self.selected_fields:
             columns = list(self.selected_fields)
 
-        return pq.read_table(source=locations, filesystem=fs, columns=columns)
+        pyarrow_filter = None
+        if self.row_filter is not AlwaysTrue():
+            bound_row_filter = bind(self.table.schema(), self.row_filter)
+            pyarrow_filter = expression_to_pyarrow(bound_row_filter)
+
+        from pyarrow.dataset import dataset
+
+        ds = dataset(
+            source=locations,
+            filesystem=fs,
+            # Optionally provide the Schema for the Dataset,
+            # in which case it will not be inferred from the source.
+            # https://arrow.apache.org/docs/python/generated/pyarrow.dataset.dataset.html#pyarrow.dataset.dataset
+            schema=schema_to_pyarrow(self.table.schema()),

Review Comment:
   What I mean is that this probably needs to use the field names from each Parquet data file, not the field names from the table schema. That's because Parquet typically projects columns by name, but Iceberg uses IDs. There's no guarantee that the file's column names match the table's column names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033440343


##########
python/tests/io/test_pyarrow.py:
##########
@@ -411,3 +434,140 @@ def test_list_type_to_pyarrow():
         element_required=True,
     )
     assert visit(iceberg_map, _ConvertToArrowSchema()) == pa.list_(pa.int32())
+
+
+@pytest.fixture
+def bound_reference(table_schema_simple: Schema) -> BoundReference[str]:
+    return BoundReference(table_schema_simple.find_field(1), table_schema_simple.accessor_for_field(1))
+
+
+@pytest.fixture
+def bound_double_reference(table_schema_simple: Schema) -> BoundReference[float]:

Review Comment:
   No, copy paste 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032837608


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +400,95 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BooleanExpressionVisitor[pc.Expression]):
+    def visit_true(self) -> pc.Expression:
+        return pc.scalar(True)
+
+    def visit_false(self) -> pc.Expression:
+        return pc.scalar(False)
+
+    def visit_not(self, child_result: pc.Expression) -> pc.Expression:
+        return ~child_result
+
+    def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result & right_result
+
+    def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result | right_result
+
+    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> pc.Expression:
+        raise ValueError("Please bind the expression first")
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> pc.Expression:
+        return _iceberg_to_pyarrow_predicate(predicate)
+
+
+@singledispatch
+def _iceberg_to_pyarrow_predicate(expr: BoundPredicate[str]) -> pc.Expression:
+    raise ValueError(f"Unknown expression: {expr}")
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNull)
+def _(bound: BoundIsNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(False)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotNull)
+def _(bound: BoundNotNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_valid()
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNaN)
+def _(bound: BoundIsNaN[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(True)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotNaN)
+def _(bound: BoundNotNaN[str]) -> pc.Expression:
+    return ~pc.field(bound.term.ref().field.name).is_null(True)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundEqualTo)
+def _(bound: BoundEqualTo[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name) == bound.literal.value
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotEqualTo)
+def _(bound: BoundNotEqualTo[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name) != bound.literal.value

Review Comment:
   Should this negate `==`? I don't see `!=` in the [expression docs](https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Expression.html).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1030732726


##########
python/pyiceberg/table/__init__.py:
##########
@@ -355,7 +355,23 @@ def to_arrow(self):
         if "*" not in self.selected_fields:
             columns = list(self.selected_fields)
 
-        return pq.read_table(source=locations, filesystem=fs, columns=columns)
+        pyarrow_filter = None
+        if self.row_filter is not AlwaysTrue():
+            bound_row_filter = bind(self.table.schema(), self.row_filter)
+            pyarrow_filter = expression_to_pyarrow(bound_row_filter)
+
+        from pyarrow.dataset import dataset
+
+        ds = dataset(

Review Comment:
   I had to replace the table with a dataset here, to allow it to pass in a PyArrow expression.



##########
python/pyiceberg/table/__init__.py:
##########
@@ -355,7 +355,23 @@ def to_arrow(self):
         if "*" not in self.selected_fields:
             columns = list(self.selected_fields)
 
-        return pq.read_table(source=locations, filesystem=fs, columns=columns)
+        pyarrow_filter = None
+        if self.row_filter is not AlwaysTrue():
+            bound_row_filter = bind(self.table.schema(), self.row_filter)
+            pyarrow_filter = expression_to_pyarrow(bound_row_filter)
+
+        from pyarrow.dataset import dataset
+
+        ds = dataset(
+            source=locations,
+            filesystem=fs,
+            # Optionally provide the Schema for the Dataset,
+            # in which case it will not be inferred from the source.
+            # https://arrow.apache.org/docs/python/generated/pyarrow.dataset.dataset.html#pyarrow.dataset.dataset
+            schema=schema_to_pyarrow(self.table.schema()),
+        )
+
+        return ds.to_table(filter=pyarrow_filter, columns=columns)

Review Comment:
   I'm not sure if we want to return a table or a dataset here. I think the end-user should be able to use both. The Dataset also has a nice method called `to_batches` to read the data in chunks: https://arrow.apache.org/docs/python/dataset.html#iterative-out-of-core-or-streaming-reads
   This seams very applicable to Iceberg.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032979436


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +400,95 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BooleanExpressionVisitor[pc.Expression]):
+    def visit_true(self) -> pc.Expression:
+        return pc.scalar(True)
+
+    def visit_false(self) -> pc.Expression:
+        return pc.scalar(False)
+
+    def visit_not(self, child_result: pc.Expression) -> pc.Expression:
+        return ~child_result
+
+    def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result & right_result
+
+    def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result | right_result
+
+    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> pc.Expression:
+        raise ValueError("Please bind the expression first")
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> pc.Expression:
+        return _iceberg_to_pyarrow_predicate(predicate)
+
+
+@singledispatch
+def _iceberg_to_pyarrow_predicate(expr: BoundPredicate[str]) -> pc.Expression:
+    raise ValueError(f"Unknown expression: {expr}")
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNull)
+def _(bound: BoundIsNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(False)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotNull)
+def _(bound: BoundNotNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_valid()
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNaN)
+def _(bound: BoundIsNaN[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(True)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotNaN)
+def _(bound: BoundNotNaN[str]) -> pc.Expression:
+    return ~pc.field(bound.term.ref().field.name).is_null(True)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundEqualTo)
+def _(bound: BoundEqualTo[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name) == bound.literal.value
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotEqualTo)
+def _(bound: BoundNotEqualTo[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name) != bound.literal.value
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundGreaterThanOrEqual)
+def _(bound: BoundGreaterThanOrEqual[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name) >= bound.literal.value
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundGreaterThan)
+def _(bound: BoundGreaterThan[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name) > bound.literal.value
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundLessThan)
+def _(bound: BoundLessThan[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name) < bound.literal.value
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundLessThanOrEqual)
+def _(bound: BoundLessThanOrEqual[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name) <= bound.literal.value
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIn)
+def _(bound: BoundIn[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).isin(lit.value for lit in bound.literals)

Review Comment:
   This could use `bound.value_set()` rather than the comprehension.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Fokko commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
Fokko commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1033439698


##########
python/pyiceberg/table/__init__.py:
##########
@@ -355,7 +355,23 @@ def to_arrow(self):
         if "*" not in self.selected_fields:
             columns = list(self.selected_fields)
 
-        return pq.read_table(source=locations, filesystem=fs, columns=columns)
+        pyarrow_filter = None
+        if self.row_filter is not AlwaysTrue():
+            bound_row_filter = bind(self.table.schema(), self.row_filter)

Review Comment:
   Good one, maybe we should make that one non-optional 👍🏻 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032837823


##########
python/pyiceberg/table/__init__.py:
##########
@@ -355,7 +355,23 @@ def to_arrow(self):
         if "*" not in self.selected_fields:
             columns = list(self.selected_fields)
 
-        return pq.read_table(source=locations, filesystem=fs, columns=columns)
+        pyarrow_filter = None
+        if self.row_filter is not AlwaysTrue():
+            bound_row_filter = bind(self.table.schema(), self.row_filter)
+            pyarrow_filter = expression_to_pyarrow(bound_row_filter)
+
+        from pyarrow.dataset import dataset
+
+        ds = dataset(
+            source=locations,
+            filesystem=fs,
+            # Optionally provide the Schema for the Dataset,
+            # in which case it will not be inferred from the source.
+            # https://arrow.apache.org/docs/python/generated/pyarrow.dataset.dataset.html#pyarrow.dataset.dataset
+            schema=schema_to_pyarrow(self.table.schema()),

Review Comment:
   Should this schema match the file schema? We should test a couple of projection cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6258: Python: Implement PyArrow row level filtering

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #6258:
URL: https://github.com/apache/iceberg/pull/6258#discussion_r1032838277


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -379,3 +400,95 @@ def _(_: StringType) -> pa.DataType:
 def _(_: BinaryType) -> pa.DataType:
     # Variable length by default
     return pa.binary()
+
+
+class _ConvertToArrowExpression(BooleanExpressionVisitor[pc.Expression]):
+    def visit_true(self) -> pc.Expression:
+        return pc.scalar(True)
+
+    def visit_false(self) -> pc.Expression:
+        return pc.scalar(False)
+
+    def visit_not(self, child_result: pc.Expression) -> pc.Expression:
+        return ~child_result
+
+    def visit_and(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result & right_result
+
+    def visit_or(self, left_result: pc.Expression, right_result: pc.Expression) -> pc.Expression:
+        return left_result | right_result
+
+    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> pc.Expression:
+        raise ValueError("Please bind the expression first")
+
+    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> pc.Expression:
+        return _iceberg_to_pyarrow_predicate(predicate)
+
+
+@singledispatch
+def _iceberg_to_pyarrow_predicate(expr: BoundPredicate[str]) -> pc.Expression:
+    raise ValueError(f"Unknown expression: {expr}")
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNull)
+def _(bound: BoundIsNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(False)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotNull)
+def _(bound: BoundNotNull[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_valid()
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundIsNaN)
+def _(bound: BoundIsNaN[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name).is_null(True)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotNaN)
+def _(bound: BoundNotNaN[str]) -> pc.Expression:
+    return ~pc.field(bound.term.ref().field.name).is_null(True)
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundEqualTo)
+def _(bound: BoundEqualTo[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name) == bound.literal.value
+
+
+@_iceberg_to_pyarrow_predicate.register(BoundNotEqualTo)
+def _(bound: BoundNotEqualTo[str]) -> pc.Expression:
+    return pc.field(bound.term.ref().field.name) != bound.literal.value

Review Comment:
   Looks like this is correct from tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org