You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2022/12/23 08:40:47 UTC

[arrow] branch master updated: GH-14975: [Python] Dataset.sort_by (#14976)

This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 387e95ad57 GH-14975: [Python] Dataset.sort_by (#14976)
387e95ad57 is described below

commit 387e95ad575fd158bb2758e97800716d3976fce2
Author: Alessandro Molina <am...@turbogears.org>
AuthorDate: Fri Dec 23 09:40:40 2022 +0100

    GH-14975: [Python] Dataset.sort_by (#14976)
    
    * Closes: #14975
    
    - [x] Proof of concept using an ExecPlan
    - [x] Add test to filter and then sort to confirm lazy filtering works with sorting.
    
    Authored-by: Alessandro Molina <am...@turbogears.org>
    Signed-off-by: Joris Van den Bossche <jo...@gmail.com>
---
 python/pyarrow/_compute.pxd          |  5 ++++
 python/pyarrow/_dataset.pyx          | 29 ++++++++++++++++++++
 python/pyarrow/_exec_plan.pyx        | 48 ++++++++++++++++++++++++++++-----
 python/pyarrow/includes/libarrow.pxd |  4 +++
 python/pyarrow/table.pxi             | 10 +++----
 python/pyarrow/tests/test_dataset.py | 51 ++++++++++++++++++++++++++++++++++++
 6 files changed, 135 insertions(+), 12 deletions(-)

diff --git a/python/pyarrow/_compute.pxd b/python/pyarrow/_compute.pxd
index 8b09cbd445..ee348e9816 100644
--- a/python/pyarrow/_compute.pxd
+++ b/python/pyarrow/_compute.pxd
@@ -27,6 +27,7 @@ cdef class ScalarUdfContext(_Weakrefable):
 
     cdef void init(self, const CScalarUdfContext& c_context)
 
+
 cdef class FunctionOptions(_Weakrefable):
     cdef:
         shared_ptr[CFunctionOptions] wrapped
@@ -37,6 +38,10 @@ cdef class FunctionOptions(_Weakrefable):
     cdef inline shared_ptr[CFunctionOptions] unwrap(self)
 
 
+cdef class _SortOptions(FunctionOptions):
+    pass
+
+
 cdef CExpression _bind(Expression filter, Schema schema) except *
 
 
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 26c9f503bd..7c504775e7 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -448,6 +448,35 @@ cdef class Dataset(_Weakrefable):
         filtered_dataset._scan_options = dict(filter=new_filter)
         return filtered_dataset
 
+    def sort_by(self, sorting, **kwargs):
+        """
+        Sort the Dataset by one or multiple columns.
+
+        Parameters
+        ----------
+        sorting : str or list[tuple(name, order)]
+            Name of the column to use to sort (ascending), or
+            a list of multiple sorting conditions where
+            each entry is a tuple with column name
+            and sorting order ("ascending" or "descending")
+        **kwargs : dict, optional
+            Additional sorting options.
+            As allowed by :class:`SortOptions`
+
+        Returns
+        -------
+        InMemoryDataset
+            A new dataset sorted according to the sort keys.
+        """
+        if isinstance(sorting, str):
+            sorting = [(sorting, "ascending")]
+
+        res = _pc()._exec_plan._sort_source(self, output_type=InMemoryDataset,
+                                            sort_options=_pc().SortOptions(
+                                                sort_keys=sorting, **kwargs
+                                            ))
+        return res
+
     def join(self, right_dataset, keys, right_keys=None, join_type="left outer",
              left_suffix=None, right_suffix=None, coalesce_keys=True,
              use_threads=True):
diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx
index 5e48bf7076..85072b21f9 100644
--- a/python/pyarrow/_exec_plan.pyx
+++ b/python/pyarrow/_exec_plan.pyx
@@ -30,14 +30,15 @@ from pyarrow.includes.libarrow_dataset cimport *
 from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table,
                           RecordBatchReader)
 from pyarrow.lib import tobytes
-from pyarrow._compute cimport Expression, _true
+from pyarrow._compute cimport Expression, _true, _SortOptions
 from pyarrow._dataset cimport Dataset, Scanner
 from pyarrow._dataset import InMemoryDataset
 
 Initialize()  # Initialise support for Datasets in ExecPlan
 
 
-cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True):
+cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True,
+              _SortOptions sort_options=None):
     """
     Internal Function to create an ExecPlan and run it.
 
@@ -72,6 +73,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
         shared_ptr[CScanNodeOptions] c_scanopts
         shared_ptr[CExecNodeOptions] c_input_node_opts
         shared_ptr[CSinkNodeOptions] c_sinkopts
+        shared_ptr[COrderBySinkNodeOptions] c_orderbysinkopts
         shared_ptr[CAsyncExecBatchGenerator] c_async_exec_batch_gen
         shared_ptr[CRecordBatchReader] c_recordbatchreader
         shared_ptr[CRecordBatchReader] c_recordbatchreader_in
@@ -146,11 +148,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
 
     # Create the output node
     c_async_exec_batch_gen = make_shared[CAsyncExecBatchGenerator]()
-    c_sinkopts = make_shared[CSinkNodeOptions](c_async_exec_batch_gen.get())
-    GetResultValue(
-        MakeExecNode(tobytes("sink"), &deref(c_exec_plan),
-                     c_final_node_vec, deref(c_sinkopts))
-    )
+
+    if sort_options is None:
+        c_sinkopts = make_shared[CSinkNodeOptions](
+            c_async_exec_batch_gen.get())
+        GetResultValue(
+            MakeExecNode(tobytes("sink"), &deref(c_exec_plan),
+                         c_final_node_vec, deref(c_sinkopts))
+        )
+    else:
+        c_orderbysinkopts = make_shared[COrderBySinkNodeOptions](
+            deref(<CSortOptions*>(sort_options.unwrap().get())),
+            c_async_exec_batch_gen.get()
+        )
+        GetResultValue(
+            MakeExecNode(tobytes("order_by_sink"), &deref(c_exec_plan),
+                         c_final_node_vec, deref(c_orderbysinkopts))
+        )
 
     # Convert the asyncgenerator to a sync batch reader
     c_recordbatchreader = MakeGeneratorReader(c_node.output_schema(),
@@ -413,3 +427,23 @@ def _filter_table(table, expression, output_type=Table):
         return InMemoryDataset(r.select(table.schema.names))
     else:
         raise TypeError("Unsupported output type")
+
+
+def _sort_source(table_or_dataset, sort_options, output_type=Table):
+    cdef:
+        vector[CDeclaration] c_empty_decl_plan
+
+    r = execplan([table_or_dataset],
+                 plan=c_empty_decl_plan,
+                 output_type=Table,
+                 use_threads=True,
+                 sort_options=sort_options)
+
+    if output_type == Table:
+        return r
+    elif output_type == InMemoryDataset:
+        # Get rid of special dataset columns
+        # "__fragment_index", "__batch_index", "__last_in_fragment", "__filename"
+        return InMemoryDataset(r.select(table_or_dataset.schema.names))
+    else:
+        raise TypeError("Unsupported output type")
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index d647db67cf..df6a883afe 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2583,6 +2583,10 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil
         CProjectNodeOptions(vector[CExpression] expressions,
                             vector[c_string] names)
 
+    cdef cppclass COrderBySinkNodeOptions "arrow::compute::OrderBySinkNodeOptions"(CExecNodeOptions):
+        COrderBySinkNodeOptions(vector[CSortOptions] options,
+                                CAsyncExecBatchGenerator generator)
+
     cdef cppclass CHashJoinNodeOptions "arrow::compute::HashJoinNodeOptions"(CExecNodeOptions):
         CHashJoinNodeOptions(CJoinType, vector[CFieldRef] in_left_keys,
                              vector[CFieldRef] in_right_keys)
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 5fba3cbfb1..53e8412282 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -4758,11 +4758,11 @@ cdef class Table(_PandasConvertible):
         if isinstance(sorting, str):
             sorting = [(sorting, "ascending")]
 
-        indices = _pc().sort_indices(
-            self,
-            options=_pc().SortOptions(sort_keys=sorting, **kwargs)
-        )
-        return self.take(indices)
+        res = _pc()._exec_plan._sort_source(self, output_type=Table,
+                                            sort_options=_pc().SortOptions(
+                                                sort_keys=sorting, **kwargs
+                                            ))
+        return res
 
     def join(self, right_table, keys, right_keys=None, join_type="left outer",
              left_suffix=None, right_suffix=None, coalesce_keys=True,
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index 49966133c0..ecac5211a4 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -5079,3 +5079,54 @@ def test_dataset_partition_with_slash(tmpdir):
     file_paths = sorted(os.listdir(path))
 
     assert encoded_paths == file_paths
+
+
+@pytest.mark.parametrize('dstype', [
+    "fs", "mem"
+])
+def test_dataset_sort_by(tempdir, dstype):
+    table = pa.table([
+        pa.array([3, 1, 4, 2, 5]),
+        pa.array(["b", "a", "b", "a", "c"]),
+    ], names=["values", "keys"])
+
+    if dstype == "fs":
+        ds.write_dataset(table, tempdir / "t1", format="ipc")
+        dt = ds.dataset(tempdir / "t1", format="ipc")
+    elif dstype == "mem":
+        dt = ds.dataset(table)
+    else:
+        raise NotImplementedError
+
+    assert dt.sort_by("values").to_table().to_pydict() == {
+        "keys": ["a", "a", "b", "b", "c"],
+        "values": [1, 2, 3, 4, 5]
+    }
+
+    assert dt.sort_by([("values", "descending")]).to_table().to_pydict() == {
+        "keys": ["c", "b", "b", "a", "a"],
+        "values": [5, 4, 3, 2, 1]
+    }
+
+    assert dt.filter((pc.field("values") < 4)).sort_by(
+        "values"
+    ).to_table().to_pydict() == {
+        "keys": ["a", "a", "b"],
+        "values": [1, 2, 3]
+    }
+
+    table = pa.Table.from_arrays([
+        pa.array([5, 7, 7, 35], type=pa.int64()),
+        pa.array(["foo", "car", "bar", "foobar"])
+    ], names=["a", "b"])
+    dt = ds.dataset(table)
+
+    sorted_tab = dt.sort_by([("a", "descending")])
+    sorted_tab_dict = sorted_tab.to_table().to_pydict()
+    assert sorted_tab_dict["a"] == [35, 7, 7, 5]
+    assert sorted_tab_dict["b"] == ["foobar", "car", "bar", "foo"]
+
+    sorted_tab = dt.sort_by([("a", "ascending")])
+    sorted_tab_dict = sorted_tab.to_table().to_pydict()
+    assert sorted_tab_dict["a"] == [5, 7, 7, 35]
+    assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"]