You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2023/06/01 14:02:42 UTC

[arrow] branch main updated: GH-33976: [Python] Remove usage of TableGroupBy helper in favor of pyarrow.acero.Declaration (#34769)

This is an automated email from the ASF dual-hosted git repository.

jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0bb2d83a3b GH-33976: [Python] Remove usage of TableGroupBy helper in favor of pyarrow.acero.Declaration (#34769)
0bb2d83a3b is described below

commit 0bb2d83a3bba92a28495a67de74d94598e456625
Author: Joris Van den Bossche <jo...@gmail.com>
AuthorDate: Thu Jun 1 16:02:33 2023 +0200

    GH-33976: [Python] Remove usage of TableGroupBy helper in favor of pyarrow.acero.Declaration (#34769)
    
    ### Rationale for this change
    
    Now we have the pyarrow.acero building blocks (GH-33976), we can easily construct the Declaration, which `arrow::compute::TableGroupBy` creates under the hood, ourselves in pyarrow.
    
    ### Are these changes tested?
    
    Existing tests are passing.
    
    ### Are there any user-facing changes?
    
    No
    
    Authored-by: Joris Van den Bossche <jo...@gmail.com>
    Signed-off-by: Joris Van den Bossche <jo...@gmail.com>
---
 cpp/src/arrow/acero/CMakeLists.txt                 |  3 +-
 cpp/src/arrow/acero/aggregate_benchmark.cc         | 15 ++++-
 .../{groupby_test.cc => aggregate_node_test.cc}    | 14 ++++-
 cpp/src/arrow/acero/groupby.cc                     | 61 --------------------
 cpp/src/arrow/acero/groupby.h                      | 65 ----------------------
 python/pyarrow/_acero.pyx                          | 32 -----------
 python/pyarrow/acero.py                            | 10 +++-
 python/pyarrow/includes/libarrow.pxd               |  6 ++
 python/pyarrow/includes/libarrow_acero.pxd         | 14 -----
 python/pyarrow/table.pxi                           | 11 +---
 10 files changed, 46 insertions(+), 185 deletions(-)

diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt
index 258dcb5580..b86b353f86 100644
--- a/cpp/src/arrow/acero/CMakeLists.txt
+++ b/cpp/src/arrow/acero/CMakeLists.txt
@@ -28,7 +28,6 @@ macro(append_acero_avx2_src SRC)
 endmacro()
 
 set(ARROW_ACERO_SRCS
-    groupby.cc
     accumulation_queue.cc
     aggregate_node.cc
     asof_join_node.cc
@@ -166,7 +165,7 @@ add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
 add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)
 add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
 add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)
-add_arrow_acero_test(groupby_test SOURCES groupby_test.cc)
+add_arrow_acero_test(aggregate_node_test SOURCES aggregate_node_test.cc)
 add_arrow_acero_test(util_test SOURCES util_test.cc task_util_test.cc)
 add_arrow_acero_test(hash_aggregate_test SOURCES hash_aggregate_test.cc)
 
diff --git a/cpp/src/arrow/acero/aggregate_benchmark.cc b/cpp/src/arrow/acero/aggregate_benchmark.cc
index d65b52fe52..4db7e44322 100644
--- a/cpp/src/arrow/acero/aggregate_benchmark.cc
+++ b/cpp/src/arrow/acero/aggregate_benchmark.cc
@@ -19,7 +19,8 @@
 
 #include <vector>
 
-#include "arrow/acero/groupby.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
 #include "arrow/array/array_primitive.h"
 #include "arrow/compute/api.h"
 #include "arrow/table.h"
@@ -347,6 +348,18 @@ std::shared_ptr<RecordBatch> RecordBatchFromArrays(
   return RecordBatch::Make(schema(std::move(fields)), length, std::move(all_arrays));
 }
 
+Result<std::shared_ptr<Table>> BatchGroupBy(
+    std::shared_ptr<RecordBatch> batch, std::vector<Aggregate> aggregates,
+    std::vector<FieldRef> keys, bool use_threads = false,
+    MemoryPool* memory_pool = default_memory_pool()) {
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
+                        Table::FromRecordBatches({std::move(batch)}));
+  Declaration plan = Declaration::Sequence(
+      {{"table_source", TableSourceNodeOptions(std::move(table))},
+       {"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
+  return DeclarationToTable(std::move(plan), use_threads, memory_pool);
+}
+
 static void BenchmarkGroupBy(benchmark::State& state, std::vector<Aggregate> aggregates,
                              const std::vector<std::shared_ptr<Array>>& arguments,
                              const std::vector<std::shared_ptr<Array>>& keys) {
diff --git a/cpp/src/arrow/acero/groupby_test.cc b/cpp/src/arrow/acero/aggregate_node_test.cc
similarity index 90%
rename from cpp/src/arrow/acero/groupby_test.cc
rename to cpp/src/arrow/acero/aggregate_node_test.cc
index 7fe67a1f0d..5b63038874 100644
--- a/cpp/src/arrow/acero/groupby_test.cc
+++ b/cpp/src/arrow/acero/aggregate_node_test.cc
@@ -14,13 +14,15 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#include "arrow/acero/groupby.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
 
 #include <gmock/gmock-matchers.h>
 #include <gtest/gtest.h>
 
 #include <memory>
 
+#include "arrow/result.h"
 #include "arrow/table.h"
 #include "arrow/testing/gtest_util.h"
 
@@ -28,6 +30,16 @@ namespace arrow {
 
 namespace acero {
 
+Result<std::shared_ptr<Table>> TableGroupBy(
+    std::shared_ptr<Table> table, std::vector<Aggregate> aggregates,
+    std::vector<FieldRef> keys, bool use_threads = false,
+    MemoryPool* memory_pool = default_memory_pool()) {
+  Declaration plan = Declaration::Sequence(
+      {{"table_source", TableSourceNodeOptions(std::move(table))},
+       {"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
+  return DeclarationToTable(std::move(plan), use_threads, memory_pool);
+}
+
 TEST(GroupByConvenienceFunc, Basic) {
   std::shared_ptr<Schema> in_schema =
       schema({field("key1", utf8()), field("key2", int32()), field("value", int32())});
diff --git a/cpp/src/arrow/acero/groupby.cc b/cpp/src/arrow/acero/groupby.cc
deleted file mode 100644
index a0b220d653..0000000000
--- a/cpp/src/arrow/acero/groupby.cc
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/acero/groupby.h"
-
-#include <mutex>
-#include <thread>
-#include <unordered_map>
-
-#include "arrow/compute/exec_internal.h"
-#include "arrow/compute/registry.h"
-#include "arrow/compute/row/grouper.h"
-#include "arrow/record_batch.h"
-#include "arrow/table.h"
-#include "arrow/util/checked_cast.h"
-#include "arrow/util/logging.h"
-#include "arrow/util/string.h"
-#include "arrow/util/task_group.h"
-
-namespace arrow {
-
-namespace acero {
-
-Result<std::shared_ptr<Table>> TableGroupBy(std::shared_ptr<Table> table,
-                                            std::vector<Aggregate> aggregates,
-                                            std::vector<FieldRef> keys, bool use_threads,
-                                            MemoryPool* memory_pool) {
-  Declaration plan = Declaration::Sequence(
-      {{"table_source", TableSourceNodeOptions(std::move(table))},
-       {"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
-  return DeclarationToTable(std::move(plan), use_threads, memory_pool);
-}
-
-Result<std::shared_ptr<Table>> BatchGroupBy(std::shared_ptr<RecordBatch> batch,
-                                            std::vector<Aggregate> aggregates,
-                                            std::vector<FieldRef> keys, bool use_threads,
-                                            MemoryPool* memory_pool) {
-  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
-                        Table::FromRecordBatches({std::move(batch)}));
-  Declaration plan = Declaration::Sequence(
-      {{"table_source", TableSourceNodeOptions(std::move(table))},
-       {"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
-  return DeclarationToTable(std::move(plan), use_threads, memory_pool);
-}
-
-}  // namespace acero
-}  // namespace arrow
diff --git a/cpp/src/arrow/acero/groupby.h b/cpp/src/arrow/acero/groupby.h
deleted file mode 100644
index c24990ad69..0000000000
--- a/cpp/src/arrow/acero/groupby.h
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <memory>
-#include <vector>
-
-#include "arrow/acero/exec_plan.h"
-#include "arrow/acero/options.h"
-#include "arrow/acero/visibility.h"
-#include "arrow/compute/api_aggregate.h"
-#include "arrow/compute/exec.h"
-#include "arrow/compute/kernel.h"
-#include "arrow/datum.h"
-#include "arrow/result.h"
-
-namespace arrow {
-namespace acero {
-
-/// Convenience function to perform a group-by on a table
-///
-/// The result will be calculated using an exec plan with an aggregate node.
-///
-/// If there are no arguments/aggregates then the returned table will have one row
-/// for each unique combination of keys
-///
-/// Note: If there are many groups the output table may have multiple chunks.
-///
-/// If there are no keys then the aggregates will be applied to the full table.
-/// The output table in this scenario is guaranteed to have exactly 1 row.
-///
-/// \return a table that will have one column for each aggregate, named after they
-/// aggregate function, and one column for each key
-ARROW_ACERO_EXPORT
-Result<std::shared_ptr<Table>> TableGroupBy(
-    std::shared_ptr<Table> table, std::vector<Aggregate> aggregates,
-    std::vector<FieldRef> keys, bool use_threads = false,
-    MemoryPool* memory_pool = default_memory_pool());
-
-/// Convenience function to perform a group-by on a record batch
-///
-/// \see GroupByTable
-ARROW_ACERO_EXPORT
-Result<std::shared_ptr<Table>> BatchGroupBy(
-    std::shared_ptr<RecordBatch> record_batch, std::vector<Aggregate> aggregates,
-    std::vector<FieldRef> keys, bool use_threads = false,
-    MemoryPool* memory_pool = default_memory_pool());
-
-}  // namespace acero
-}  // namespace arrow
diff --git a/python/pyarrow/_acero.pyx b/python/pyarrow/_acero.pyx
index af39ce361f..bb3196c86e 100644
--- a/python/pyarrow/_acero.pyx
+++ b/python/pyarrow/_acero.pyx
@@ -527,35 +527,3 @@ cdef class Declaration(_Weakrefable):
             GetResultValue(DeclarationToReader(self.unwrap(), use_threads)).release()
         )
         return reader
-
-
-def _group_by(table, aggregates, keys):
-    cdef:
-        shared_ptr[CTable] c_table
-        vector[CAggregate] c_aggregates
-        vector[CFieldRef] c_keys
-        CAggregate c_aggr
-
-    c_table = (<Table> table).sp_table
-
-    for aggr_arg_indices, aggr_func_name, aggr_opts, aggr_name in aggregates:
-        c_aggr.function = tobytes(aggr_func_name)
-        if aggr_opts is not None:
-            c_aggr.options = (<FunctionOptions?>aggr_opts).wrapped
-        else:
-            c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr
-        for field_idx in aggr_arg_indices:
-            c_aggr.target.push_back(CFieldRef(<int> field_idx))
-
-        c_aggr.name = tobytes(aggr_name)
-        c_aggregates.push_back(move(c_aggr))
-
-    for key_idx in keys:
-        c_keys.push_back(CFieldRef(<int> key_idx))
-
-    with nogil:
-        sp_table = GetResultValue(
-            CTableGroupBy(c_table, c_aggregates, c_keys)
-        )
-
-    return pyarrow_wrap_table(sp_table)
diff --git a/python/pyarrow/acero.py b/python/pyarrow/acero.py
index 04c396a26f..0150148b50 100644
--- a/python/pyarrow/acero.py
+++ b/python/pyarrow/acero.py
@@ -26,7 +26,6 @@ from pyarrow.lib import Table
 from pyarrow.compute import Expression, field
 
 from pyarrow._acero import (  # noqa
-    _group_by,
     Declaration,
     ExecNodeOptions,
     TableSourceNodeOptions,
@@ -292,3 +291,12 @@ def _sort_source(table_or_dataset, sort_keys, output_type=Table, **kwargs):
         return ds.InMemoryDataset(result_table)
     else:
         raise TypeError("Unsupported output type")
+
+
+def _group_by(table, aggregates, keys):
+
+    decl = Declaration.from_sequence([
+        Declaration("table_source", TableSourceNodeOptions(table)),
+        Declaration("aggregate", AggregateNodeOptions(aggregates, keys=keys))
+    ])
+    return decl.to_table(use_threads=True)
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 8fc531dff0..3190877ea0 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2346,6 +2346,12 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil:
         CIndexOptions(shared_ptr[CScalar] value)
         shared_ptr[CScalar] value
 
+    cdef cppclass CAggregate "arrow::compute::Aggregate":
+        c_string function
+        shared_ptr[CFunctionOptions] options
+        vector[CFieldRef] target
+        c_string name
+
     cdef enum CMapLookupOccurrence \
             "arrow::compute::MapLookupOptions::Occurrence":
         CMapLookupOccurrence_ALL "arrow::compute::MapLookupOptions::ALL"
diff --git a/python/pyarrow/includes/libarrow_acero.pxd b/python/pyarrow/includes/libarrow_acero.pxd
index 920668cdd0..bb1e3646c9 100644
--- a/python/pyarrow/includes/libarrow_acero.pxd
+++ b/python/pyarrow/includes/libarrow_acero.pxd
@@ -21,20 +21,6 @@ from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
 
 
-cdef extern from "arrow/acero/groupby.h" namespace \
-        "arrow::acero" nogil:
-    cdef cppclass CAggregate "arrow::compute::Aggregate":
-        c_string function
-        shared_ptr[CFunctionOptions] options
-        vector[CFieldRef] target
-        c_string name
-
-    CResult[shared_ptr[CTable]] CTableGroupBy "arrow::acero::TableGroupBy"(
-        shared_ptr[CTable] table,
-        vector[CAggregate] aggregates,
-        vector[CFieldRef] keys)
-
-
 cdef extern from "arrow/acero/options.h" namespace "arrow::acero" nogil:
     cdef enum CJoinType "arrow::acero::JoinType":
         CJoinType_LEFT_SEMI "arrow::acero::JoinType::LEFT_SEMI"
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index e051a53f70..5f1ee00201 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -5430,11 +5430,6 @@ list[tuple(str, str, FunctionOptions)]
                 aggr_name = func_nohash
             else:
                 aggr_name = "_".join(target) + "_" + func_nohash
-            # Calculate target indices by resolving field names
-            target_indices = [
-                self._table.schema.get_field_index(f) for f in target]
-            group_by_aggrs.append((target_indices, func, opt, aggr_name))
-
-        key_indices = [
-            self._table.schema.get_field_index(k) for k in self.keys]
-        return _pac()._group_by(self._table, group_by_aggrs, key_indices)
+            group_by_aggrs.append((target, func, opt, aggr_name))
+
+        return _pac()._group_by(self._table, group_by_aggrs, self.keys)