You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2022/05/11 11:53:11 UTC
[arrow] branch master updated: ARROW-16467: [Python] Add helper function _exec_plan._filter_table to filter tables based on Expression
This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 0bae875bb8 ARROW-16467: [Python] Add helper function _exec_plan._filter_table to filter tables based on Expression
0bae875bb8 is described below
commit 0bae875bb8d992aab762e7fd81eef24bda7963ad
Author: Alessandro Molina <am...@turbogears.org>
AuthorDate: Wed May 11 13:53:01 2022 +0200
ARROW-16467: [Python] Add helper function _exec_plan._filter_table to filter tables based on Expression
The function is focused on Tables, as it will be the foundation for `Table.filter`, but as the extra work was minimal I added support for Dataset too.
Closes #13075 from amol-/ARROW-16467
Authored-by: Alessandro Molina <am...@turbogears.org>
Signed-off-by: Joris Van den Bossche <jo...@gmail.com>
---
python/pyarrow/_exec_plan.pyx | 44 ++++++++++++++++++++++-
python/pyarrow/includes/libarrow.pxd | 3 ++
python/pyarrow/tests/test_exec_plan.py | 65 ++++++++++++++++++++++++++++++++++
3 files changed, 111 insertions(+), 1 deletion(-)
diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx
index 7cbce9baa6..753abe27cf 100644
--- a/python/pyarrow/_exec_plan.pyx
+++ b/python/pyarrow/_exec_plan.pyx
@@ -202,7 +202,7 @@ def _perform_join(join_type, left_operand not None, left_keys,
Returns
-------
- result_table : Table
+ result_table : Table or InMemoryDataset
"""
cdef:
vector[CFieldRef] c_left_keys
@@ -351,3 +351,45 @@ def _perform_join(join_type, left_operand not None, left_keys,
use_threads=use_threads)
return result_table
+
+
+def _filter_table(table, expression, output_type=Table):
+ """Filter rows of a table or dataset based on the provided expression.
+
+ The result will be an output table with only the rows matching
+ the provided expression.
+
+ Parameters
+ ----------
+ table : Table or Dataset
+ Table or Dataset that should be filtered.
+ expression : Expression
+ The expression on which rows should be filtered.
+ output_type: Table or InMemoryDataset
+ The output type for the filtered result.
+
+ Returns
+ -------
+ result_table : Table or InMemoryDataset
+ """
+ cdef:
+ vector[CDeclaration] c_decl_plan
+ Expression expr = expression
+
+ c_decl_plan.push_back(
+ CDeclaration(tobytes("filter"), CFilterNodeOptions(
+ <CExpression>expr.unwrap(), True
+ ))
+ )
+
+ r = execplan([table], plan=c_decl_plan,
+ output_type=Table, use_threads=False)
+
+ if output_type == Table:
+ return r
+ elif output_type == InMemoryDataset:
+ # Get rid of special dataset columns
+ # "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
+ return InMemoryDataset(r.select(table.schema.names))
+ else:
+ raise TypeError("Unsupported output type")
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 819ba567df..2e51864b86 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2458,6 +2458,9 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil
cdef cppclass CSinkNodeOptions "arrow::compute::SinkNodeOptions"(CExecNodeOptions):
pass
+ cdef cppclass CFilterNodeOptions "arrow::compute::FilterNodeOptions"(CExecNodeOptions):
+ CFilterNodeOptions(CExpression, c_bool async_mode)
+
cdef cppclass CProjectNodeOptions "arrow::compute::ProjectNodeOptions"(CExecNodeOptions):
CProjectNodeOptions(vector[CExpression] expressions)
CProjectNodeOptions(vector[CExpression] expressions,
diff --git a/python/pyarrow/tests/test_exec_plan.py b/python/pyarrow/tests/test_exec_plan.py
index 1f940cfd14..dd6ca9eec2 100644
--- a/python/pyarrow/tests/test_exec_plan.py
+++ b/python/pyarrow/tests/test_exec_plan.py
@@ -17,6 +17,7 @@
import pytest
import pyarrow as pa
+import pyarrow.compute as pc
try:
import pyarrow.dataset as ds
@@ -190,3 +191,67 @@ def test_table_join_keys_order():
"colVals_l": ["a", "b", "f", None],
"colVals_r": ["A", "B", None, "Z"],
})
+
+
+def test_filter_table_errors():
+ t = pa.table({
+ "a": [1, 2, 3, 4, 5],
+ "b": [10, 20, 30, 40, 50]
+ })
+
+ with pytest.raises(pa.ArrowTypeError):
+ ep._filter_table(
+ t, pc.divide(pc.field("a"), pc.scalar(2)),
+ output_type=pa.Table
+ )
+
+ with pytest.raises(pa.ArrowInvalid):
+ ep._filter_table(
+ t, (pc.field("Z") <= pc.scalar(2)),
+ output_type=pa.Table
+ )
+
+
+@pytest.mark.parametrize("use_datasets", [False, True])
+def test_filter_table(use_datasets):
+ t = pa.table({
+ "a": [1, 2, 3, 4, 5],
+ "b": [10, 20, 30, 40, 50]
+ })
+ if use_datasets:
+ t = ds.dataset([t])
+
+ result = ep._filter_table(
+ t, (pc.field("a") <= pc.scalar(3)) & (pc.field("b") == pc.scalar(20)),
+ output_type=pa.Table if not use_datasets else ds.InMemoryDataset
+ )
+ if use_datasets:
+ result = result.to_table()
+ assert result == pa.table({
+ "a": [2],
+ "b": [20]
+ })
+
+ result = ep._filter_table(
+ t, pc.field("b") > pc.scalar(30),
+ output_type=pa.Table if not use_datasets else ds.InMemoryDataset
+ )
+ if use_datasets:
+ result = result.to_table()
+ assert result == pa.table({
+ "a": [4, 5],
+ "b": [40, 50]
+ })
+
+
+def test_filter_table_ordering():
+ table1 = pa.table({'a': [1, 2, 3, 4], 'b': ['a'] * 4})
+ table2 = pa.table({'a': [1, 2, 3, 4], 'b': ['b'] * 4})
+ table = pa.concat_tables([table1, table2])
+
+ for _ in range(20):
+ # 20 seems to consistently cause errors when order is not preserved.
+ # If the order problem is reintroduced this test will become flaky
+ # which is still a signal that the order is not preserved.
+ r = ep._filter_table(table, pc.field('a') == 1)
+ assert r["b"] == pa.chunked_array([["a"], ["b"]])