You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2022/05/11 11:53:11 UTC

[arrow] branch master updated: ARROW-16467: [Python] Add helper function _exec_plan._filter_table to filter tables based on Expression

This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bae875bb8 ARROW-16467: [Python] Add helper function _exec_plan._filter_table to filter tables based on Expression
0bae875bb8 is described below

commit 0bae875bb8d992aab762e7fd81eef24bda7963ad
Author: Alessandro Molina <am...@turbogears.org>
AuthorDate: Wed May 11 13:53:01 2022 +0200

    ARROW-16467: [Python] Add helper function _exec_plan._filter_table to filter tables based on Expression
    
    The function is focused on Tables, as it will be the foundation for `Table.filter`, but as the extra work was minimal I added support for Dataset too.
    
    Closes #13075 from amol-/ARROW-16467
    
    Authored-by: Alessandro Molina <am...@turbogears.org>
    Signed-off-by: Joris Van den Bossche <jo...@gmail.com>
---
 python/pyarrow/_exec_plan.pyx          | 44 ++++++++++++++++++++++-
 python/pyarrow/includes/libarrow.pxd   |  3 ++
 python/pyarrow/tests/test_exec_plan.py | 65 ++++++++++++++++++++++++++++++++++
 3 files changed, 111 insertions(+), 1 deletion(-)

diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx
index 7cbce9baa6..753abe27cf 100644
--- a/python/pyarrow/_exec_plan.pyx
+++ b/python/pyarrow/_exec_plan.pyx
@@ -202,7 +202,7 @@ def _perform_join(join_type, left_operand not None, left_keys,
 
     Returns
     -------
-    result_table : Table
+    result_table : Table or InMemoryDataset
     """
     cdef:
         vector[CFieldRef] c_left_keys
@@ -351,3 +351,45 @@ def _perform_join(join_type, left_operand not None, left_keys,
                             use_threads=use_threads)
 
     return result_table
+
+
+def _filter_table(table, expression, output_type=Table):
+    """Filter rows of a table or dataset based on the provided expression.
+
+    The result will be an output table with only the rows matching
+    the provided expression.
+
+    Parameters
+    ----------
+    table : Table or Dataset
+        Table or Dataset that should be filtered.
+    expression : Expression
+        The expression on which rows should be filtered.
+    output_type: Table or InMemoryDataset
+        The output type for the filtered result.
+
+    Returns
+    -------
+    result_table : Table or InMemoryDataset
+    """
+    cdef:
+        vector[CDeclaration] c_decl_plan
+        Expression expr = expression
+
+    c_decl_plan.push_back(
+        CDeclaration(tobytes("filter"), CFilterNodeOptions(
+            <CExpression>expr.unwrap(), True
+        ))
+    )
+
+    r = execplan([table], plan=c_decl_plan,
+                 output_type=Table, use_threads=False)
+
+    if output_type == Table:
+        return r
+    elif output_type == InMemoryDataset:
+        # Get rid of special dataset columns
+        # "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
+        return InMemoryDataset(r.select(table.schema.names))
+    else:
+        raise TypeError("Unsupported output type")
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 819ba567df..2e51864b86 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2458,6 +2458,9 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil
     cdef cppclass CSinkNodeOptions "arrow::compute::SinkNodeOptions"(CExecNodeOptions):
         pass
 
+    cdef cppclass CFilterNodeOptions "arrow::compute::FilterNodeOptions"(CExecNodeOptions):
+        CFilterNodeOptions(CExpression, c_bool async_mode)
+
     cdef cppclass CProjectNodeOptions "arrow::compute::ProjectNodeOptions"(CExecNodeOptions):
         CProjectNodeOptions(vector[CExpression] expressions)
         CProjectNodeOptions(vector[CExpression] expressions,
diff --git a/python/pyarrow/tests/test_exec_plan.py b/python/pyarrow/tests/test_exec_plan.py
index 1f940cfd14..dd6ca9eec2 100644
--- a/python/pyarrow/tests/test_exec_plan.py
+++ b/python/pyarrow/tests/test_exec_plan.py
@@ -17,6 +17,7 @@
 
 import pytest
 import pyarrow as pa
+import pyarrow.compute as pc
 
 try:
     import pyarrow.dataset as ds
@@ -190,3 +191,67 @@ def test_table_join_keys_order():
         "colVals_l": ["a", "b", "f", None],
         "colVals_r": ["A", "B", None, "Z"],
     })
+
+
+def test_filter_table_errors():
+    t = pa.table({
+        "a": [1, 2, 3, 4, 5],
+        "b": [10, 20, 30, 40, 50]
+    })
+
+    with pytest.raises(pa.ArrowTypeError):
+        ep._filter_table(
+            t, pc.divide(pc.field("a"), pc.scalar(2)),
+            output_type=pa.Table
+        )
+
+    with pytest.raises(pa.ArrowInvalid):
+        ep._filter_table(
+            t, (pc.field("Z") <= pc.scalar(2)),
+            output_type=pa.Table
+        )
+
+
+@pytest.mark.parametrize("use_datasets", [False, True])
+def test_filter_table(use_datasets):
+    t = pa.table({
+        "a": [1, 2, 3, 4, 5],
+        "b": [10, 20, 30, 40, 50]
+    })
+    if use_datasets:
+        t = ds.dataset([t])
+
+    result = ep._filter_table(
+        t, (pc.field("a") <= pc.scalar(3)) & (pc.field("b") == pc.scalar(20)),
+        output_type=pa.Table if not use_datasets else ds.InMemoryDataset
+    )
+    if use_datasets:
+        result = result.to_table()
+    assert result == pa.table({
+        "a": [2],
+        "b": [20]
+    })
+
+    result = ep._filter_table(
+        t, pc.field("b") > pc.scalar(30),
+        output_type=pa.Table if not use_datasets else ds.InMemoryDataset
+    )
+    if use_datasets:
+        result = result.to_table()
+    assert result == pa.table({
+        "a": [4, 5],
+        "b": [40, 50]
+    })
+
+
+def test_filter_table_ordering():
+    table1 = pa.table({'a': [1, 2, 3, 4], 'b': ['a'] * 4})
+    table2 = pa.table({'a': [1, 2, 3, 4], 'b': ['b'] * 4})
+    table = pa.concat_tables([table1, table2])
+
+    for _ in range(20):
+        # 20 seems to consistently cause errors when order is not preserved.
+        # If the order problem is reintroduced this test will become flaky
+        # which is still a signal that the order is not preserved.
+        r = ep._filter_table(table, pc.field('a') == 1)
+        assert r["b"] == pa.chunked_array([["a"], ["b"]])