You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2021/11/25 16:46:24 UTC

[arrow] branch master updated: ARROW-14608: [Python] Provide access to hash_aggregate functions through a Table.group_by method

This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 999d97a  ARROW-14608: [Python] Provide access to hash_aggregate functions through a Table.group_by method
999d97a is described below

commit 999d97add8e540021b7f42ffec91a6b26ddf2691
Author: Alessandro Molina <am...@turbogears.org>
AuthorDate: Thu Nov 25 17:43:31 2021 +0100

    ARROW-14608: [Python] Provide access to hash_aggregate functions through a Table.group_by method
    
    Closes #11624 from amol-/ARROW-14608
    
    Authored-by: Alessandro Molina <am...@turbogears.org>
    Signed-off-by: Joris Van den Bossche <jo...@gmail.com>
---
 docs/source/python/api/tables.rst    |   1 +
 python/pyarrow/__init__.py           |   2 +-
 python/pyarrow/_compute.pyx          |  27 ++++++++
 python/pyarrow/compute.py            |   1 +
 python/pyarrow/includes/libarrow.pxd |  11 +++
 python/pyarrow/table.pxi             | 118 ++++++++++++++++++++++++++++++++
 python/pyarrow/tests/test_table.py   | 128 +++++++++++++++++++++++++++++++++++
 7 files changed, 287 insertions(+), 1 deletion(-)

diff --git a/docs/source/python/api/tables.rst b/docs/source/python/api/tables.rst
index 6e7a3b6..4cdba24 100644
--- a/docs/source/python/api/tables.rst
+++ b/docs/source/python/api/tables.rst
@@ -43,6 +43,7 @@ Classes
    ChunkedArray
    RecordBatch
    Table
+   TableGroupBy
 
 .. _api.tensor:
 
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 1ec229d..f1e7ca9 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -182,7 +182,7 @@ from pyarrow.lib import (NativeFile, PythonFile,
 from pyarrow._hdfsio import HdfsFile, have_libhdfs
 
 from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table,
-                         concat_arrays, concat_tables)
+                         concat_arrays, concat_tables, TableGroupBy)
 
 # Exceptions
 from pyarrow.lib import (ArrowCancelled,
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index d62c9c0..d315cc4 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -1294,3 +1294,30 @@ class TDigestOptions(_TDigestOptions):
         if not isinstance(q, (list, tuple, np.ndarray)):
             q = [q]
         self._set_options(q, delta, buffer_size, skip_nulls, min_count)
+
+
+def _group_by(args, keys, aggregations):
+    cdef:
+        vector[CDatum] c_args
+        vector[CDatum] c_keys
+        vector[CAggregate] c_aggregations
+        CDatum result
+        CAggregate c_aggr
+
+    _pack_compute_args(args, &c_args)
+    _pack_compute_args(keys, &c_keys)
+
+    for aggr_func_name, aggr_opts in aggregations:
+        c_aggr.function = tobytes(aggr_func_name)
+        if aggr_opts is not None:
+            c_aggr.options = (<FunctionOptions?>aggr_opts).get_options()
+        else:
+            c_aggr.options = NULL
+        c_aggregations.push_back(c_aggr)
+
+    with nogil:
+        result = GetResultValue(
+            GroupBy(c_args, c_keys, c_aggregations)
+        )
+
+    return wrap_datum(result)
diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py
index be4e3ed..7192f49 100644
--- a/python/pyarrow/compute.py
+++ b/python/pyarrow/compute.py
@@ -70,6 +70,7 @@ from pyarrow._compute import (  # noqa
     function_registry,
     get_function,
     list_functions,
+    _group_by
 )
 
 import inspect
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 6d187ea..d7706df 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2218,6 +2218,17 @@ cdef extern from * namespace "arrow::compute":
             const CBuffer& buffer)
 
 
+cdef extern from "arrow/compute/api_aggregate.h" namespace \
+        "arrow::compute::internal" nogil:
+    cdef cppclass CAggregate "arrow::compute::internal::Aggregate":
+        c_string function
+        const CFunctionOptions* options
+
+    CResult[CDatum] GroupBy(const vector[CDatum]& arguments,
+                            const vector[CDatum]& keys,
+                            const vector[CAggregate]& aggregates)
+
+
 cdef extern from "arrow/python/api.h" namespace "arrow::py":
     # Requires GIL
     CResult[shared_ptr[CDataType]] InferArrowType(
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index f3dc6f7..e6b0b55 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -2192,6 +2192,53 @@ cdef class Table(_PandasConvertible):
 
         return table
 
+    def group_by(self, keys):
+        """Declare a grouping over the columns of the table.
+
+        Resulting grouping can then be used to perform aggregations
+        with a subsequent ``aggregate()`` method.
+
+        Parameters
+        ----------
+        keys : str or list[str]
+            Name of the columns that should be used as the grouping key.
+
+        Returns
+        -------
+        TableGroupBy
+
+        See Also
+        --------
+        TableGroupBy.aggregate
+        """
+        return TableGroupBy(self, keys)
+
+    def sort_by(self, sorting):
+        """
+        Sort the table by one or multiple columns.
+
+        Parameters
+        ----------
+        sorting : str or list[tuple(name, order)]
+            Name of the column to use to sort (ascending), or
+            a list of multiple sorting conditions where
+            each entry is a tuple with column name
+            and sorting order ("ascending" or "descending")
+
+        Returns
+        -------
+        Table
+            A new table sorted according to the sort keys.
+        """
+        if isinstance(sorting, str):
+            sorting = [(sorting, "ascending")]
+
+        indices = _pc().sort_indices(
+            self,
+            sort_keys=sorting
+        )
+        return self.take(indices)
+
 
 def _reconstruct_table(arrays, schema):
     """
@@ -2387,3 +2434,74 @@ def _from_pydict(cls, mapping, schema, metadata):
         return cls.from_arrays(arrays, schema=schema, metadata=metadata)
     else:
         raise TypeError('Schema must be an instance of pyarrow.Schema')
+
+
+class TableGroupBy:
+    """
+    A grouping of columns in a table on which to perform aggregations.
+    """
+
+    def __init__(self, table, keys):
+        if isinstance(keys, str):
+            keys = [keys]
+
+        self._table = table
+        self.keys = keys
+
+    def aggregate(self, aggregations):
+        """
+        Perform an aggregation over the grouped columns of the table.
+
+        Parameters
+        ----------
+        aggregations : list[tuple(str, str)] or \
+list[tuple(str, str, FunctionOptions)]
+            List of tuples made of aggregation column names followed
+            by function names and optionally aggregation function options.
+
+        Returns
+        -------
+        Table
+            Results of the aggregation functions.
+
+        Example
+        -------
+        >>> t = pa.table([
+        ...       pa.array(["a", "a", "b", "b", "c"]),
+        ...       pa.array([1, 2, 3, 4, 5]),
+        ... ], names=["keys", "values"])
+        >>> t.group_by("keys").aggregate([("values", "sum")])
+        pyarrow.Table
+        values_sum: int64
+        keys: string
+        ----
+        values_sum: [[3,7,5]]
+        keys: [["a","b","c"]]
+        """
+        columns = [a[0] for a in aggregations]
+        aggrfuncs = [
+            (a[1], a[2]) if len(a) > 2 else (a[1], None)
+            for a in aggregations
+        ]
+
+        group_by_aggrs = []
+        for aggr in aggrfuncs:
+            if not aggr[0].startswith("hash_"):
+                aggr = ("hash_" + aggr[0], aggr[1])
+            group_by_aggrs.append(aggr)
+
+        # Build unique names for aggregation result columns
+        # so that it's obvious what they refer to.
+        column_names = [
+            aggr_name.replace("hash", col_name)
+            for col_name, (aggr_name, _) in zip(columns, group_by_aggrs)
+        ] + self.keys
+
+        result = _pc()._group_by(
+            [self._table[c] for c in columns],
+            [self._table[k] for k in self.keys],
+            group_by_aggrs
+        )
+
+        t = Table.from_batches([RecordBatch.from_struct_array(result)])
+        return t.rename_columns(column_names)
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index ef41a73..dd0ce07 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -24,6 +24,7 @@ import weakref
 import numpy as np
 import pytest
 import pyarrow as pa
+import pyarrow.compute as pc
 
 
 def test_chunked_array_basics():
@@ -1746,3 +1747,130 @@ def test_table_select():
     result = table.select(['f2'])
     expected = pa.table([a2], ['f2'])
     assert result.equals(expected)
+
+
+def test_table_group_by():
+    def sorted_by_keys(d):
+        # Ensure a guaranteed order of keys for aggregation results.
+        if "keys2" in d:
+            keys = tuple(zip(d["keys"], d["keys2"]))
+        else:
+            keys = d["keys"]
+        sorted_keys = sorted(keys)
+        sorted_d = {"keys": sorted(d["keys"])}
+        for entry in d:
+            if entry == "keys":
+                continue
+            values = dict(zip(keys, d[entry]))
+            for k in sorted_keys:
+                sorted_d.setdefault(entry, []).append(values[k])
+        return sorted_d
+
+    table = pa.table([
+        pa.array(["a", "a", "b", "b", "c"]),
+        pa.array(["X", "X", "Y", "Z", "Z"]),
+        pa.array([1, 2, 3, 4, 5]),
+        pa.array([10, 20, 30, 40, 50])
+    ], names=["keys", "keys2", "values", "bigvalues"])
+
+    r = table.group_by("keys").aggregate([
+        ("values", "hash_sum")
+    ])
+    assert sorted_by_keys(r.to_pydict()) == {
+        "keys": ["a", "b", "c"],
+        "values_sum": [3, 7, 5]
+    }
+
+    r = table.group_by("keys").aggregate([
+        ("values", "hash_sum"),
+        ("values", "hash_count")
+    ])
+    assert sorted_by_keys(r.to_pydict()) == {
+        "keys": ["a", "b", "c"],
+        "values_sum": [3, 7, 5],
+        "values_count": [2, 2, 1]
+    }
+
+    # Test without hash_ prefix
+    r = table.group_by("keys").aggregate([
+        ("values", "sum")
+    ])
+    assert sorted_by_keys(r.to_pydict()) == {
+        "keys": ["a", "b", "c"],
+        "values_sum": [3, 7, 5]
+    }
+
+    r = table.group_by("keys").aggregate([
+        ("values", "max"),
+        ("bigvalues", "sum")
+    ])
+    assert sorted_by_keys(r.to_pydict()) == {
+        "keys": ["a", "b", "c"],
+        "values_max": [2, 4, 5],
+        "bigvalues_sum": [30, 70, 50]
+    }
+
+    r = table.group_by("keys").aggregate([
+        ("bigvalues", "max"),
+        ("values", "sum")
+    ])
+    assert sorted_by_keys(r.to_pydict()) == {
+        "keys": ["a", "b", "c"],
+        "values_sum": [3, 7, 5],
+        "bigvalues_max": [20, 40, 50]
+    }
+
+    r = table.group_by(["keys", "keys2"]).aggregate([
+        ("values", "sum")
+    ])
+    assert sorted_by_keys(r.to_pydict()) == {
+        "keys": ["a", "b", "b", "c"],
+        "keys2": ["X", "Y", "Z", "Z"],
+        "values_sum": [3, 3, 4, 5]
+    }
+
+    table_with_nulls = pa.table([
+        pa.array(["a", "a", "a"]),
+        pa.array([1, None, None])
+    ], names=["keys", "values"])
+
+    r = table_with_nulls.group_by(["keys"]).aggregate([
+        ("values", "count", pc.CountOptions(mode="all"))
+    ])
+    assert r.to_pydict() == {
+        "keys": ["a"],
+        "values_count": [3]
+    }
+
+    r = table_with_nulls.group_by(["keys"]).aggregate([
+        ("values", "count", pc.CountOptions(mode="only_null"))
+    ])
+    assert r.to_pydict() == {
+        "keys": ["a"],
+        "values_count": [2]
+    }
+
+    r = table_with_nulls.group_by(["keys"]).aggregate([
+        ("values", "count", pc.CountOptions(mode="only_valid"))
+    ])
+    assert r.to_pydict() == {
+        "keys": ["a"],
+        "values_count": [1]
+    }
+
+
+def test_table_sort_by():
+    table = pa.table([
+        pa.array([3, 1, 4, 2, 5]),
+        pa.array(["b", "a", "b", "a", "c"]),
+    ], names=["values", "keys"])
+
+    assert table.sort_by("values").to_pydict() == {
+        "keys": ["a", "a", "b", "b", "c"],
+        "values": [1, 2, 3, 4, 5]
+    }
+
+    assert table.sort_by([("values", "descending")]).to_pydict() == {
+        "keys": ["c", "b", "b", "a", "a"],
+        "values": [5, 4, 3, 2, 1]
+    }