You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2022/12/23 08:40:47 UTC
[arrow] branch master updated: GH-14975: [Python] Dataset.sort_by (#14976)
This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 387e95ad57 GH-14975: [Python] Dataset.sort_by (#14976)
387e95ad57 is described below
commit 387e95ad575fd158bb2758e97800716d3976fce2
Author: Alessandro Molina <am...@turbogears.org>
AuthorDate: Fri Dec 23 09:40:40 2022 +0100
GH-14975: [Python] Dataset.sort_by (#14976)
* Closes: #14975
- [x] Proof of concept using an ExecPlan
- [x] Add test to filter and then sort to confirm lazy filtering works with sorting.
Authored-by: Alessandro Molina <am...@turbogears.org>
Signed-off-by: Joris Van den Bossche <jo...@gmail.com>
---
python/pyarrow/_compute.pxd | 5 ++++
python/pyarrow/_dataset.pyx | 29 ++++++++++++++++++++
python/pyarrow/_exec_plan.pyx | 48 ++++++++++++++++++++++++++++-----
python/pyarrow/includes/libarrow.pxd | 4 +++
python/pyarrow/table.pxi | 10 +++----
python/pyarrow/tests/test_dataset.py | 51 ++++++++++++++++++++++++++++++++++++
6 files changed, 135 insertions(+), 12 deletions(-)
diff --git a/python/pyarrow/_compute.pxd b/python/pyarrow/_compute.pxd
index 8b09cbd445..ee348e9816 100644
--- a/python/pyarrow/_compute.pxd
+++ b/python/pyarrow/_compute.pxd
@@ -27,6 +27,7 @@ cdef class ScalarUdfContext(_Weakrefable):
cdef void init(self, const CScalarUdfContext& c_context)
+
cdef class FunctionOptions(_Weakrefable):
cdef:
shared_ptr[CFunctionOptions] wrapped
@@ -37,6 +38,10 @@ cdef class FunctionOptions(_Weakrefable):
cdef inline shared_ptr[CFunctionOptions] unwrap(self)
+cdef class _SortOptions(FunctionOptions):
+ pass
+
+
cdef CExpression _bind(Expression filter, Schema schema) except *
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 26c9f503bd..7c504775e7 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -448,6 +448,35 @@ cdef class Dataset(_Weakrefable):
filtered_dataset._scan_options = dict(filter=new_filter)
return filtered_dataset
+ def sort_by(self, sorting, **kwargs):
+ """
+ Sort the Dataset by one or multiple columns.
+
+ Parameters
+ ----------
+ sorting : str or list[tuple(name, order)]
+ Name of the column to use to sort (ascending), or
+ a list of multiple sorting conditions where
+ each entry is a tuple with column name
+ and sorting order ("ascending" or "descending")
+ **kwargs : dict, optional
+ Additional sorting options.
+ As allowed by :class:`SortOptions`
+
+ Returns
+ -------
+ InMemoryDataset
+ A new dataset sorted according to the sort keys.
+ """
+ if isinstance(sorting, str):
+ sorting = [(sorting, "ascending")]
+
+ res = _pc()._exec_plan._sort_source(self, output_type=InMemoryDataset,
+ sort_options=_pc().SortOptions(
+ sort_keys=sorting, **kwargs
+ ))
+ return res
+
def join(self, right_dataset, keys, right_keys=None, join_type="left outer",
left_suffix=None, right_suffix=None, coalesce_keys=True,
use_threads=True):
diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx
index 5e48bf7076..85072b21f9 100644
--- a/python/pyarrow/_exec_plan.pyx
+++ b/python/pyarrow/_exec_plan.pyx
@@ -30,14 +30,15 @@ from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table,
RecordBatchReader)
from pyarrow.lib import tobytes
-from pyarrow._compute cimport Expression, _true
+from pyarrow._compute cimport Expression, _true, _SortOptions
from pyarrow._dataset cimport Dataset, Scanner
from pyarrow._dataset import InMemoryDataset
Initialize() # Initialise support for Datasets in ExecPlan
-cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True):
+cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True,
+ _SortOptions sort_options=None):
"""
Internal Function to create an ExecPlan and run it.
@@ -72,6 +73,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
shared_ptr[CScanNodeOptions] c_scanopts
shared_ptr[CExecNodeOptions] c_input_node_opts
shared_ptr[CSinkNodeOptions] c_sinkopts
+ shared_ptr[COrderBySinkNodeOptions] c_orderbysinkopts
shared_ptr[CAsyncExecBatchGenerator] c_async_exec_batch_gen
shared_ptr[CRecordBatchReader] c_recordbatchreader
shared_ptr[CRecordBatchReader] c_recordbatchreader_in
@@ -146,11 +148,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
# Create the output node
c_async_exec_batch_gen = make_shared[CAsyncExecBatchGenerator]()
- c_sinkopts = make_shared[CSinkNodeOptions](c_async_exec_batch_gen.get())
- GetResultValue(
- MakeExecNode(tobytes("sink"), &deref(c_exec_plan),
- c_final_node_vec, deref(c_sinkopts))
- )
+
+ if sort_options is None:
+ c_sinkopts = make_shared[CSinkNodeOptions](
+ c_async_exec_batch_gen.get())
+ GetResultValue(
+ MakeExecNode(tobytes("sink"), &deref(c_exec_plan),
+ c_final_node_vec, deref(c_sinkopts))
+ )
+ else:
+ c_orderbysinkopts = make_shared[COrderBySinkNodeOptions](
+ deref(<CSortOptions*>(sort_options.unwrap().get())),
+ c_async_exec_batch_gen.get()
+ )
+ GetResultValue(
+ MakeExecNode(tobytes("order_by_sink"), &deref(c_exec_plan),
+ c_final_node_vec, deref(c_orderbysinkopts))
+ )
# Convert the asyncgenerator to a sync batch reader
c_recordbatchreader = MakeGeneratorReader(c_node.output_schema(),
@@ -413,3 +427,23 @@ def _filter_table(table, expression, output_type=Table):
return InMemoryDataset(r.select(table.schema.names))
else:
raise TypeError("Unsupported output type")
+
+
+def _sort_source(table_or_dataset, sort_options, output_type=Table):
+ cdef:
+ vector[CDeclaration] c_empty_decl_plan
+
+ r = execplan([table_or_dataset],
+ plan=c_empty_decl_plan,
+ output_type=Table,
+ use_threads=True,
+ sort_options=sort_options)
+
+ if output_type == Table:
+ return r
+ elif output_type == InMemoryDataset:
+ # Get rid of special dataset columns
+ # "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
+ return InMemoryDataset(r.select(table_or_dataset.schema.names))
+ else:
+ raise TypeError("Unsupported output type")
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index d647db67cf..df6a883afe 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2583,6 +2583,10 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil
CProjectNodeOptions(vector[CExpression] expressions,
vector[c_string] names)
+ cdef cppclass COrderBySinkNodeOptions "arrow::compute::OrderBySinkNodeOptions"(CExecNodeOptions):
+ COrderBySinkNodeOptions(vector[CSortOptions] options,
+ CAsyncExecBatchGenerator generator)
+
cdef cppclass CHashJoinNodeOptions "arrow::compute::HashJoinNodeOptions"(CExecNodeOptions):
CHashJoinNodeOptions(CJoinType, vector[CFieldRef] in_left_keys,
vector[CFieldRef] in_right_keys)
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 5fba3cbfb1..53e8412282 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -4758,11 +4758,11 @@ cdef class Table(_PandasConvertible):
if isinstance(sorting, str):
sorting = [(sorting, "ascending")]
- indices = _pc().sort_indices(
- self,
- options=_pc().SortOptions(sort_keys=sorting, **kwargs)
- )
- return self.take(indices)
+ res = _pc()._exec_plan._sort_source(self, output_type=Table,
+ sort_options=_pc().SortOptions(
+ sort_keys=sorting, **kwargs
+ ))
+ return res
def join(self, right_table, keys, right_keys=None, join_type="left outer",
left_suffix=None, right_suffix=None, coalesce_keys=True,
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 49966133c0..ecac5211a4 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -5079,3 +5079,54 @@ def test_dataset_partition_with_slash(tmpdir):
file_paths = sorted(os.listdir(path))
assert encoded_paths == file_paths
+
+
+@pytest.mark.parametrize('dstype', [
+ "fs", "mem"
+])
+def test_dataset_sort_by(tempdir, dstype):
+ table = pa.table([
+ pa.array([3, 1, 4, 2, 5]),
+ pa.array(["b", "a", "b", "a", "c"]),
+ ], names=["values", "keys"])
+
+ if dstype == "fs":
+ ds.write_dataset(table, tempdir / "t1", format="ipc")
+ dt = ds.dataset(tempdir / "t1", format="ipc")
+ elif dstype == "mem":
+ dt = ds.dataset(table)
+ else:
+ raise NotImplementedError
+
+ assert dt.sort_by("values").to_table().to_pydict() == {
+ "keys": ["a", "a", "b", "b", "c"],
+ "values": [1, 2, 3, 4, 5]
+ }
+
+ assert dt.sort_by([("values", "descending")]).to_table().to_pydict() == {
+ "keys": ["c", "b", "b", "a", "a"],
+ "values": [5, 4, 3, 2, 1]
+ }
+
+ assert dt.filter((pc.field("values") < 4)).sort_by(
+ "values"
+ ).to_table().to_pydict() == {
+ "keys": ["a", "a", "b"],
+ "values": [1, 2, 3]
+ }
+
+ table = pa.Table.from_arrays([
+ pa.array([5, 7, 7, 35], type=pa.int64()),
+ pa.array(["foo", "car", "bar", "foobar"])
+ ], names=["a", "b"])
+ dt = ds.dataset(table)
+
+ sorted_tab = dt.sort_by([("a", "descending")])
+ sorted_tab_dict = sorted_tab.to_table().to_pydict()
+ assert sorted_tab_dict["a"] == [35, 7, 7, 5]
+ assert sorted_tab_dict["b"] == ["foobar", "car", "bar", "foo"]
+
+ sorted_tab = dt.sort_by([("a", "ascending")])
+ sorted_tab_dict = sorted_tab.to_table().to_pydict()
+ assert sorted_tab_dict["a"] == [5, 7, 7, 35]
+ assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"]