You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2021/11/25 16:46:24 UTC
[arrow] branch master updated: ARROW-14608: [Python] Provide access to hash_aggregate functions through a Table.group_by method
This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 999d97a ARROW-14608: [Python] Provide access to hash_aggregate functions through a Table.group_by method
999d97a is described below
commit 999d97add8e540021b7f42ffec91a6b26ddf2691
Author: Alessandro Molina <am...@turbogears.org>
AuthorDate: Thu Nov 25 17:43:31 2021 +0100
ARROW-14608: [Python] Provide access to hash_aggregate functions through a Table.group_by method
Closes #11624 from amol-/ARROW-14608
Authored-by: Alessandro Molina <am...@turbogears.org>
Signed-off-by: Joris Van den Bossche <jo...@gmail.com>
---
docs/source/python/api/tables.rst | 1 +
python/pyarrow/__init__.py | 2 +-
python/pyarrow/_compute.pyx | 27 ++++++++
python/pyarrow/compute.py | 1 +
python/pyarrow/includes/libarrow.pxd | 11 +++
python/pyarrow/table.pxi | 118 ++++++++++++++++++++++++++++++++
python/pyarrow/tests/test_table.py | 128 +++++++++++++++++++++++++++++++++++
7 files changed, 287 insertions(+), 1 deletion(-)
diff --git a/docs/source/python/api/tables.rst b/docs/source/python/api/tables.rst
index 6e7a3b6..4cdba24 100644
--- a/docs/source/python/api/tables.rst
+++ b/docs/source/python/api/tables.rst
@@ -43,6 +43,7 @@ Classes
ChunkedArray
RecordBatch
Table
+ TableGroupBy
.. _api.tensor:
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 1ec229d..f1e7ca9 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -182,7 +182,7 @@ from pyarrow.lib import (NativeFile, PythonFile,
from pyarrow._hdfsio import HdfsFile, have_libhdfs
from pyarrow.lib import (ChunkedArray, RecordBatch, Table, table,
- concat_arrays, concat_tables)
+ concat_arrays, concat_tables, TableGroupBy)
# Exceptions
from pyarrow.lib import (ArrowCancelled,
diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx
index d62c9c0..d315cc4 100644
--- a/python/pyarrow/_compute.pyx
+++ b/python/pyarrow/_compute.pyx
@@ -1294,3 +1294,30 @@ class TDigestOptions(_TDigestOptions):
if not isinstance(q, (list, tuple, np.ndarray)):
q = [q]
self._set_options(q, delta, buffer_size, skip_nulls, min_count)
+
+
+def _group_by(args, keys, aggregations):
+ cdef:
+ vector[CDatum] c_args
+ vector[CDatum] c_keys
+ vector[CAggregate] c_aggregations
+ CDatum result
+ CAggregate c_aggr
+
+ _pack_compute_args(args, &c_args)
+ _pack_compute_args(keys, &c_keys)
+
+ for aggr_func_name, aggr_opts in aggregations:
+ c_aggr.function = tobytes(aggr_func_name)
+ if aggr_opts is not None:
+ c_aggr.options = (<FunctionOptions?>aggr_opts).get_options()
+ else:
+ c_aggr.options = NULL
+ c_aggregations.push_back(c_aggr)
+
+ with nogil:
+ result = GetResultValue(
+ GroupBy(c_args, c_keys, c_aggregations)
+ )
+
+ return wrap_datum(result)
diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py
index be4e3ed..7192f49 100644
--- a/python/pyarrow/compute.py
+++ b/python/pyarrow/compute.py
@@ -70,6 +70,7 @@ from pyarrow._compute import ( # noqa
function_registry,
get_function,
list_functions,
+ _group_by
)
import inspect
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 6d187ea..d7706df 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2218,6 +2218,17 @@ cdef extern from * namespace "arrow::compute":
const CBuffer& buffer)
+cdef extern from "arrow/compute/api_aggregate.h" namespace \
+ "arrow::compute::internal" nogil:
+ cdef cppclass CAggregate "arrow::compute::internal::Aggregate":
+ c_string function
+ const CFunctionOptions* options
+
+ CResult[CDatum] GroupBy(const vector[CDatum]& arguments,
+ const vector[CDatum]& keys,
+ const vector[CAggregate]& aggregates)
+
+
cdef extern from "arrow/python/api.h" namespace "arrow::py":
# Requires GIL
CResult[shared_ptr[CDataType]] InferArrowType(
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index f3dc6f7..e6b0b55 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -2192,6 +2192,53 @@ cdef class Table(_PandasConvertible):
return table
+ def group_by(self, keys):
+ """Declare a grouping over the columns of the table.
+
+ Resulting grouping can then be used to perform aggregations
+ with a subsequent ``aggregate()`` method.
+
+ Parameters
+ ----------
+ keys : str or list[str]
+ Name of the columns that should be used as the grouping key.
+
+ Returns
+ -------
+ TableGroupBy
+
+ See Also
+ --------
+ TableGroupBy.aggregate
+ """
+ return TableGroupBy(self, keys)
+
+ def sort_by(self, sorting):
+ """
+ Sort the table by one or multiple columns.
+
+ Parameters
+ ----------
+ sorting : str or list[tuple(name, order)]
+ Name of the column to use to sort (ascending), or
+ a list of multiple sorting conditions where
+ each entry is a tuple with column name
+ and sorting order ("ascending" or "descending")
+
+ Returns
+ -------
+ Table
+ A new table sorted according to the sort keys.
+ """
+ if isinstance(sorting, str):
+ sorting = [(sorting, "ascending")]
+
+ indices = _pc().sort_indices(
+ self,
+ sort_keys=sorting
+ )
+ return self.take(indices)
+
def _reconstruct_table(arrays, schema):
"""
@@ -2387,3 +2434,74 @@ def _from_pydict(cls, mapping, schema, metadata):
return cls.from_arrays(arrays, schema=schema, metadata=metadata)
else:
raise TypeError('Schema must be an instance of pyarrow.Schema')
+
+
+class TableGroupBy:
+ """
+ A grouping of columns in a table on which to perform aggregations.
+ """
+
+ def __init__(self, table, keys):
+ if isinstance(keys, str):
+ keys = [keys]
+
+ self._table = table
+ self.keys = keys
+
+ def aggregate(self, aggregations):
+ """
+ Perform an aggregation over the grouped columns of the table.
+
+ Parameters
+ ----------
+ aggregations : list[tuple(str, str)] or \
+list[tuple(str, str, FunctionOptions)]
+ List of tuples made of aggregation column names followed
+ by function names and optionally aggregation function options.
+
+ Returns
+ -------
+ Table
+ Results of the aggregation functions.
+
+ Example
+ -------
+ >>> t = pa.table([
+ ... pa.array(["a", "a", "b", "b", "c"]),
+ ... pa.array([1, 2, 3, 4, 5]),
+ ... ], names=["keys", "values"])
+ >>> t.group_by("keys").aggregate([("values", "sum")])
+ pyarrow.Table
+ values_sum: int64
+ keys: string
+ ----
+ values_sum: [[3,7,5]]
+ keys: [["a","b","c"]]
+ """
+ columns = [a[0] for a in aggregations]
+ aggrfuncs = [
+ (a[1], a[2]) if len(a) > 2 else (a[1], None)
+ for a in aggregations
+ ]
+
+ group_by_aggrs = []
+ for aggr in aggrfuncs:
+ if not aggr[0].startswith("hash_"):
+ aggr = ("hash_" + aggr[0], aggr[1])
+ group_by_aggrs.append(aggr)
+
+ # Build unique names for aggregation result columns
+ # so that it's obvious what they refer to.
+ column_names = [
+ aggr_name.replace("hash", col_name)
+ for col_name, (aggr_name, _) in zip(columns, group_by_aggrs)
+ ] + self.keys
+
+ result = _pc()._group_by(
+ [self._table[c] for c in columns],
+ [self._table[k] for k in self.keys],
+ group_by_aggrs
+ )
+
+ t = Table.from_batches([RecordBatch.from_struct_array(result)])
+ return t.rename_columns(column_names)
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index ef41a73..dd0ce07 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -24,6 +24,7 @@ import weakref
import numpy as np
import pytest
import pyarrow as pa
+import pyarrow.compute as pc
def test_chunked_array_basics():
@@ -1746,3 +1747,130 @@ def test_table_select():
result = table.select(['f2'])
expected = pa.table([a2], ['f2'])
assert result.equals(expected)
+
+
+def test_table_group_by():
+ def sorted_by_keys(d):
+ # Ensure a guaranteed order of keys for aggregation results.
+ if "keys2" in d:
+ keys = tuple(zip(d["keys"], d["keys2"]))
+ else:
+ keys = d["keys"]
+ sorted_keys = sorted(keys)
+ sorted_d = {"keys": sorted(d["keys"])}
+ for entry in d:
+ if entry == "keys":
+ continue
+ values = dict(zip(keys, d[entry]))
+ for k in sorted_keys:
+ sorted_d.setdefault(entry, []).append(values[k])
+ return sorted_d
+
+ table = pa.table([
+ pa.array(["a", "a", "b", "b", "c"]),
+ pa.array(["X", "X", "Y", "Z", "Z"]),
+ pa.array([1, 2, 3, 4, 5]),
+ pa.array([10, 20, 30, 40, 50])
+ ], names=["keys", "keys2", "values", "bigvalues"])
+
+ r = table.group_by("keys").aggregate([
+ ("values", "hash_sum")
+ ])
+ assert sorted_by_keys(r.to_pydict()) == {
+ "keys": ["a", "b", "c"],
+ "values_sum": [3, 7, 5]
+ }
+
+ r = table.group_by("keys").aggregate([
+ ("values", "hash_sum"),
+ ("values", "hash_count")
+ ])
+ assert sorted_by_keys(r.to_pydict()) == {
+ "keys": ["a", "b", "c"],
+ "values_sum": [3, 7, 5],
+ "values_count": [2, 2, 1]
+ }
+
+ # Test without hash_ prefix
+ r = table.group_by("keys").aggregate([
+ ("values", "sum")
+ ])
+ assert sorted_by_keys(r.to_pydict()) == {
+ "keys": ["a", "b", "c"],
+ "values_sum": [3, 7, 5]
+ }
+
+ r = table.group_by("keys").aggregate([
+ ("values", "max"),
+ ("bigvalues", "sum")
+ ])
+ assert sorted_by_keys(r.to_pydict()) == {
+ "keys": ["a", "b", "c"],
+ "values_max": [2, 4, 5],
+ "bigvalues_sum": [30, 70, 50]
+ }
+
+ r = table.group_by("keys").aggregate([
+ ("bigvalues", "max"),
+ ("values", "sum")
+ ])
+ assert sorted_by_keys(r.to_pydict()) == {
+ "keys": ["a", "b", "c"],
+ "values_sum": [3, 7, 5],
+ "bigvalues_max": [20, 40, 50]
+ }
+
+ r = table.group_by(["keys", "keys2"]).aggregate([
+ ("values", "sum")
+ ])
+ assert sorted_by_keys(r.to_pydict()) == {
+ "keys": ["a", "b", "b", "c"],
+ "keys2": ["X", "Y", "Z", "Z"],
+ "values_sum": [3, 3, 4, 5]
+ }
+
+ table_with_nulls = pa.table([
+ pa.array(["a", "a", "a"]),
+ pa.array([1, None, None])
+ ], names=["keys", "values"])
+
+ r = table_with_nulls.group_by(["keys"]).aggregate([
+ ("values", "count", pc.CountOptions(mode="all"))
+ ])
+ assert r.to_pydict() == {
+ "keys": ["a"],
+ "values_count": [3]
+ }
+
+ r = table_with_nulls.group_by(["keys"]).aggregate([
+ ("values", "count", pc.CountOptions(mode="only_null"))
+ ])
+ assert r.to_pydict() == {
+ "keys": ["a"],
+ "values_count": [2]
+ }
+
+ r = table_with_nulls.group_by(["keys"]).aggregate([
+ ("values", "count", pc.CountOptions(mode="only_valid"))
+ ])
+ assert r.to_pydict() == {
+ "keys": ["a"],
+ "values_count": [1]
+ }
+
+
+def test_table_sort_by():
+ table = pa.table([
+ pa.array([3, 1, 4, 2, 5]),
+ pa.array(["b", "a", "b", "a", "c"]),
+ ], names=["values", "keys"])
+
+ assert table.sort_by("values").to_pydict() == {
+ "keys": ["a", "a", "b", "b", "c"],
+ "values": [1, 2, 3, 4, 5]
+ }
+
+ assert table.sort_by([("values", "descending")]).to_pydict() == {
+ "keys": ["c", "b", "b", "a", "a"],
+ "values": [5, 4, 3, 2, 1]
+ }