You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by jo...@apache.org on 2023/06/01 14:02:42 UTC
[arrow] branch main updated: GH-33976: [Python] Remove usage of TableGroupBy helper in favor of pyarrow.acero.Declaration (#34769)
This is an automated email from the ASF dual-hosted git repository.
jorisvandenbossche pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 0bb2d83a3b GH-33976: [Python] Remove usage of TableGroupBy helper in favor of pyarrow.acero.Declaration (#34769)
0bb2d83a3b is described below
commit 0bb2d83a3bba92a28495a67de74d94598e456625
Author: Joris Van den Bossche <jo...@gmail.com>
AuthorDate: Thu Jun 1 16:02:33 2023 +0200
GH-33976: [Python] Remove usage of TableGroupBy helper in favor of pyarrow.acero.Declaration (#34769)
### Rationale for this change
Now we have the pyarrow.acero building blocks (GH-33976), we can easily construct the Declaration, which `arrow::compute::TableGroupBy` creates under the hood, ourselves in pyarrow.
### Are these changes tested?
Existing tests are passing.
### Are there any user-facing changes?
No
Authored-by: Joris Van den Bossche <jo...@gmail.com>
Signed-off-by: Joris Van den Bossche <jo...@gmail.com>
---
cpp/src/arrow/acero/CMakeLists.txt | 3 +-
cpp/src/arrow/acero/aggregate_benchmark.cc | 15 ++++-
.../{groupby_test.cc => aggregate_node_test.cc} | 14 ++++-
cpp/src/arrow/acero/groupby.cc | 61 --------------------
cpp/src/arrow/acero/groupby.h | 65 ----------------------
python/pyarrow/_acero.pyx | 32 -----------
python/pyarrow/acero.py | 10 +++-
python/pyarrow/includes/libarrow.pxd | 6 ++
python/pyarrow/includes/libarrow_acero.pxd | 14 -----
python/pyarrow/table.pxi | 11 +---
10 files changed, 46 insertions(+), 185 deletions(-)
diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt
index 258dcb5580..b86b353f86 100644
--- a/cpp/src/arrow/acero/CMakeLists.txt
+++ b/cpp/src/arrow/acero/CMakeLists.txt
@@ -28,7 +28,6 @@ macro(append_acero_avx2_src SRC)
endmacro()
set(ARROW_ACERO_SRCS
- groupby.cc
accumulation_queue.cc
aggregate_node.cc
asof_join_node.cc
@@ -166,7 +165,7 @@ add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)
add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
add_arrow_acero_test(union_node_test SOURCES union_node_test.cc)
-add_arrow_acero_test(groupby_test SOURCES groupby_test.cc)
+add_arrow_acero_test(aggregate_node_test SOURCES aggregate_node_test.cc)
add_arrow_acero_test(util_test SOURCES util_test.cc task_util_test.cc)
add_arrow_acero_test(hash_aggregate_test SOURCES hash_aggregate_test.cc)
diff --git a/cpp/src/arrow/acero/aggregate_benchmark.cc b/cpp/src/arrow/acero/aggregate_benchmark.cc
index d65b52fe52..4db7e44322 100644
--- a/cpp/src/arrow/acero/aggregate_benchmark.cc
+++ b/cpp/src/arrow/acero/aggregate_benchmark.cc
@@ -19,7 +19,8 @@
#include <vector>
-#include "arrow/acero/groupby.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
#include "arrow/array/array_primitive.h"
#include "arrow/compute/api.h"
#include "arrow/table.h"
@@ -347,6 +348,18 @@ std::shared_ptr<RecordBatch> RecordBatchFromArrays(
return RecordBatch::Make(schema(std::move(fields)), length, std::move(all_arrays));
}
+Result<std::shared_ptr<Table>> BatchGroupBy(
+ std::shared_ptr<RecordBatch> batch, std::vector<Aggregate> aggregates,
+ std::vector<FieldRef> keys, bool use_threads = false,
+ MemoryPool* memory_pool = default_memory_pool()) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
+ Table::FromRecordBatches({std::move(batch)}));
+ Declaration plan = Declaration::Sequence(
+ {{"table_source", TableSourceNodeOptions(std::move(table))},
+ {"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
+ return DeclarationToTable(std::move(plan), use_threads, memory_pool);
+}
+
static void BenchmarkGroupBy(benchmark::State& state, std::vector<Aggregate> aggregates,
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys) {
diff --git a/cpp/src/arrow/acero/groupby_test.cc b/cpp/src/arrow/acero/aggregate_node_test.cc
similarity index 90%
rename from cpp/src/arrow/acero/groupby_test.cc
rename to cpp/src/arrow/acero/aggregate_node_test.cc
index 7fe67a1f0d..5b63038874 100644
--- a/cpp/src/arrow/acero/groupby_test.cc
+++ b/cpp/src/arrow/acero/aggregate_node_test.cc
@@ -14,13 +14,15 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#include "arrow/acero/groupby.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>
#include <memory>
+#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
@@ -28,6 +30,16 @@ namespace arrow {
namespace acero {
+Result<std::shared_ptr<Table>> TableGroupBy(
+ std::shared_ptr<Table> table, std::vector<Aggregate> aggregates,
+ std::vector<FieldRef> keys, bool use_threads = false,
+ MemoryPool* memory_pool = default_memory_pool()) {
+ Declaration plan = Declaration::Sequence(
+ {{"table_source", TableSourceNodeOptions(std::move(table))},
+ {"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
+ return DeclarationToTable(std::move(plan), use_threads, memory_pool);
+}
+
TEST(GroupByConvenienceFunc, Basic) {
std::shared_ptr<Schema> in_schema =
schema({field("key1", utf8()), field("key2", int32()), field("value", int32())});
diff --git a/cpp/src/arrow/acero/groupby.cc b/cpp/src/arrow/acero/groupby.cc
deleted file mode 100644
index a0b220d653..0000000000
--- a/cpp/src/arrow/acero/groupby.cc
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/acero/groupby.h"
-
-#include <mutex>
-#include <thread>
-#include <unordered_map>
-
-#include "arrow/compute/exec_internal.h"
-#include "arrow/compute/registry.h"
-#include "arrow/compute/row/grouper.h"
-#include "arrow/record_batch.h"
-#include "arrow/table.h"
-#include "arrow/util/checked_cast.h"
-#include "arrow/util/logging.h"
-#include "arrow/util/string.h"
-#include "arrow/util/task_group.h"
-
-namespace arrow {
-
-namespace acero {
-
-Result<std::shared_ptr<Table>> TableGroupBy(std::shared_ptr<Table> table,
- std::vector<Aggregate> aggregates,
- std::vector<FieldRef> keys, bool use_threads,
- MemoryPool* memory_pool) {
- Declaration plan = Declaration::Sequence(
- {{"table_source", TableSourceNodeOptions(std::move(table))},
- {"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
- return DeclarationToTable(std::move(plan), use_threads, memory_pool);
-}
-
-Result<std::shared_ptr<Table>> BatchGroupBy(std::shared_ptr<RecordBatch> batch,
- std::vector<Aggregate> aggregates,
- std::vector<FieldRef> keys, bool use_threads,
- MemoryPool* memory_pool) {
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table,
- Table::FromRecordBatches({std::move(batch)}));
- Declaration plan = Declaration::Sequence(
- {{"table_source", TableSourceNodeOptions(std::move(table))},
- {"aggregate", AggregateNodeOptions(std::move(aggregates), std::move(keys))}});
- return DeclarationToTable(std::move(plan), use_threads, memory_pool);
-}
-
-} // namespace acero
-} // namespace arrow
diff --git a/cpp/src/arrow/acero/groupby.h b/cpp/src/arrow/acero/groupby.h
deleted file mode 100644
index c24990ad69..0000000000
--- a/cpp/src/arrow/acero/groupby.h
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <memory>
-#include <vector>
-
-#include "arrow/acero/exec_plan.h"
-#include "arrow/acero/options.h"
-#include "arrow/acero/visibility.h"
-#include "arrow/compute/api_aggregate.h"
-#include "arrow/compute/exec.h"
-#include "arrow/compute/kernel.h"
-#include "arrow/datum.h"
-#include "arrow/result.h"
-
-namespace arrow {
-namespace acero {
-
-/// Convenience function to perform a group-by on a table
-///
-/// The result will be calculated using an exec plan with an aggregate node.
-///
-/// If there are no arguments/aggregates then the returned table will have one row
-/// for each unique combination of keys
-///
-/// Note: If there are many groups the output table may have multiple chunks.
-///
-/// If there are no keys then the aggregates will be applied to the full table.
-/// The output table in this scenario is guaranteed to have exactly 1 row.
-///
-/// \return a table that will have one column for each aggregate, named after they
-/// aggregate function, and one column for each key
-ARROW_ACERO_EXPORT
-Result<std::shared_ptr<Table>> TableGroupBy(
- std::shared_ptr<Table> table, std::vector<Aggregate> aggregates,
- std::vector<FieldRef> keys, bool use_threads = false,
- MemoryPool* memory_pool = default_memory_pool());
-
-/// Convenience function to perform a group-by on a record batch
-///
-/// \see GroupByTable
-ARROW_ACERO_EXPORT
-Result<std::shared_ptr<Table>> BatchGroupBy(
- std::shared_ptr<RecordBatch> record_batch, std::vector<Aggregate> aggregates,
- std::vector<FieldRef> keys, bool use_threads = false,
- MemoryPool* memory_pool = default_memory_pool());
-
-} // namespace acero
-} // namespace arrow
diff --git a/python/pyarrow/_acero.pyx b/python/pyarrow/_acero.pyx
index af39ce361f..bb3196c86e 100644
--- a/python/pyarrow/_acero.pyx
+++ b/python/pyarrow/_acero.pyx
@@ -527,35 +527,3 @@ cdef class Declaration(_Weakrefable):
GetResultValue(DeclarationToReader(self.unwrap(), use_threads)).release()
)
return reader
-
-
-def _group_by(table, aggregates, keys):
- cdef:
- shared_ptr[CTable] c_table
- vector[CAggregate] c_aggregates
- vector[CFieldRef] c_keys
- CAggregate c_aggr
-
- c_table = (<Table> table).sp_table
-
- for aggr_arg_indices, aggr_func_name, aggr_opts, aggr_name in aggregates:
- c_aggr.function = tobytes(aggr_func_name)
- if aggr_opts is not None:
- c_aggr.options = (<FunctionOptions?>aggr_opts).wrapped
- else:
- c_aggr.options = <shared_ptr[CFunctionOptions]>nullptr
- for field_idx in aggr_arg_indices:
- c_aggr.target.push_back(CFieldRef(<int> field_idx))
-
- c_aggr.name = tobytes(aggr_name)
- c_aggregates.push_back(move(c_aggr))
-
- for key_idx in keys:
- c_keys.push_back(CFieldRef(<int> key_idx))
-
- with nogil:
- sp_table = GetResultValue(
- CTableGroupBy(c_table, c_aggregates, c_keys)
- )
-
- return pyarrow_wrap_table(sp_table)
diff --git a/python/pyarrow/acero.py b/python/pyarrow/acero.py
index 04c396a26f..0150148b50 100644
--- a/python/pyarrow/acero.py
+++ b/python/pyarrow/acero.py
@@ -26,7 +26,6 @@ from pyarrow.lib import Table
from pyarrow.compute import Expression, field
from pyarrow._acero import ( # noqa
- _group_by,
Declaration,
ExecNodeOptions,
TableSourceNodeOptions,
@@ -292,3 +291,12 @@ def _sort_source(table_or_dataset, sort_keys, output_type=Table, **kwargs):
return ds.InMemoryDataset(result_table)
else:
raise TypeError("Unsupported output type")
+
+
+def _group_by(table, aggregates, keys):
+
+ decl = Declaration.from_sequence([
+ Declaration("table_source", TableSourceNodeOptions(table)),
+ Declaration("aggregate", AggregateNodeOptions(aggregates, keys=keys))
+ ])
+ return decl.to_table(use_threads=True)
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 8fc531dff0..3190877ea0 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -2346,6 +2346,12 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil:
CIndexOptions(shared_ptr[CScalar] value)
shared_ptr[CScalar] value
+ cdef cppclass CAggregate "arrow::compute::Aggregate":
+ c_string function
+ shared_ptr[CFunctionOptions] options
+ vector[CFieldRef] target
+ c_string name
+
cdef enum CMapLookupOccurrence \
"arrow::compute::MapLookupOptions::Occurrence":
CMapLookupOccurrence_ALL "arrow::compute::MapLookupOptions::ALL"
diff --git a/python/pyarrow/includes/libarrow_acero.pxd b/python/pyarrow/includes/libarrow_acero.pxd
index 920668cdd0..bb1e3646c9 100644
--- a/python/pyarrow/includes/libarrow_acero.pxd
+++ b/python/pyarrow/includes/libarrow_acero.pxd
@@ -21,20 +21,6 @@ from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
-cdef extern from "arrow/acero/groupby.h" namespace \
- "arrow::acero" nogil:
- cdef cppclass CAggregate "arrow::compute::Aggregate":
- c_string function
- shared_ptr[CFunctionOptions] options
- vector[CFieldRef] target
- c_string name
-
- CResult[shared_ptr[CTable]] CTableGroupBy "arrow::acero::TableGroupBy"(
- shared_ptr[CTable] table,
- vector[CAggregate] aggregates,
- vector[CFieldRef] keys)
-
-
cdef extern from "arrow/acero/options.h" namespace "arrow::acero" nogil:
cdef enum CJoinType "arrow::acero::JoinType":
CJoinType_LEFT_SEMI "arrow::acero::JoinType::LEFT_SEMI"
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index e051a53f70..5f1ee00201 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -5430,11 +5430,6 @@ list[tuple(str, str, FunctionOptions)]
aggr_name = func_nohash
else:
aggr_name = "_".join(target) + "_" + func_nohash
- # Calculate target indices by resolving field names
- target_indices = [
- self._table.schema.get_field_index(f) for f in target]
- group_by_aggrs.append((target_indices, func, opt, aggr_name))
-
- key_indices = [
- self._table.schema.get_field_index(k) for k in self.keys]
- return _pac()._group_by(self._table, group_by_aggrs, key_indices)
+ group_by_aggrs.append((target, func, opt, aggr_name))
+
+ return _pac()._group_by(self._table, group_by_aggrs, self.keys)