You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ic...@apache.org on 2023/04/14 15:57:04 UTC
[arrow] branch main updated: GH-34930: [C++] Standardize aggregation column order (#34931)
This is an automated email from the ASF dual-hosted git repository.
icexelloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 5c73973a23 GH-34930: [C++] Standardize aggregation column order (#34931)
5c73973a23 is described below
commit 5c73973a23deb68bef75be8acd743763307ac12f
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Fri Apr 14 18:56:57 2023 +0300
GH-34930: [C++] Standardize aggregation column order (#34931)
See #34930, #34908
* Closes: #34930
* Closes: #34908
Authored-by: Yaron Gvili <rt...@hotmail.com>
Signed-off-by: Li Jin <ic...@gmail.com>
---
cpp/src/arrow/acero/aggregate_node.cc | 25 ++++++++++----------
cpp/src/arrow/acero/hash_aggregate_test.cc | 35 ++++++++++++++--------------
cpp/src/arrow/acero/plan_test.cc | 4 ++--
cpp/src/arrow/engine/substrait/serde_test.cc | 2 +-
4 files changed, 33 insertions(+), 33 deletions(-)
diff --git a/cpp/src/arrow/acero/aggregate_node.cc b/cpp/src/arrow/acero/aggregate_node.cc
index c5b4442544..0366809b93 100644
--- a/cpp/src/arrow/acero/aggregate_node.cc
+++ b/cpp/src/arrow/acero/aggregate_node.cc
@@ -666,19 +666,19 @@ class GroupByNode : public ExecNode, public TracedNode {
// Build field vector for output schema
FieldVector output_fields{keys.size() + segment_keys.size() + aggs.size()};
- // First output is keys, followed by segment_keys, followed by aggregates themselves
+ // First output is segment keys, followed by keys, followed by aggregates themselves
// This matches the behavior described by Substrait and also tends to be the behavior
// in SQL engines
- for (size_t i = 0; i < keys.size(); ++i) {
- int key_field_id = key_field_ids[i];
- output_fields[i] = input_schema->field(key_field_id);
- }
- size_t base = keys.size();
for (size_t i = 0; i < segment_keys.size(); ++i) {
int segment_key_field_id = segment_key_field_ids[i];
- output_fields[base + i] = input_schema->field(segment_key_field_id);
+ output_fields[i] = input_schema->field(segment_key_field_id);
+ }
+ size_t base = segment_keys.size();
+ for (size_t i = 0; i < keys.size(); ++i) {
+ int key_field_id = key_field_ids[i];
+ output_fields[base + i] = input_schema->field(key_field_id);
}
- base += segment_keys.size();
+ base += keys.size();
for (size_t i = 0; i < aggs.size(); ++i) {
output_fields[base + i] = agg_result_fields[i]->WithName(aggs[i].name);
}
@@ -829,11 +829,12 @@ class GroupByNode : public ExecNode, public TracedNode {
out_data.values.resize(agg_kernels_.size() + key_field_ids_.size() +
segment_key_field_ids_.size());
- // Keys come first
- ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
- std::move(out_keys.values.begin(), out_keys.values.end(), out_data.values.begin());
+ // Segment keys come first
+ PlaceFields(out_data, 0, segmenter_values_);
// Followed by segment keys
- PlaceFields(out_data, key_field_ids_.size(), segmenter_values_);
+ ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
+ std::move(out_keys.values.begin(), out_keys.values.end(),
+ out_data.values.begin() + segment_key_field_ids_.size());
// And finally, the aggregates themselves
std::size_t base = segment_key_field_ids_.size() + key_field_ids_.size();
for (size_t i = 0; i < agg_kernels_.size(); ++i) {
diff --git a/cpp/src/arrow/acero/hash_aggregate_test.cc b/cpp/src/arrow/acero/hash_aggregate_test.cc
index ba8b6f4653..3233441852 100644
--- a/cpp/src/arrow/acero/hash_aggregate_test.cc
+++ b/cpp/src/arrow/acero/hash_aggregate_test.cc
@@ -4776,8 +4776,8 @@ void TestSegmentKey(GroupByFunction group_by, const std::shared_ptr<Table>& tabl
}
Result<std::shared_ptr<Table>> GetSingleSegmentInputAsChunked() {
- auto table = TableFromJSON(schema({field("argument", float64()), field("key", int64()),
- field("segment_key", int64())}),
+ auto table = TableFromJSON(schema({field("segment_key", int64()), field("key", int64()),
+ field("argument", float64())}),
{R"([{"argument": 1.0, "key": 1, "segment_key": 1},
{"argument": null, "key": 1, "segment_key": 1}
])",
@@ -4830,8 +4830,8 @@ Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentScalarOutput() {
Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentKeyOutput() {
return ChunkedArrayFromJSON(struct_({
- field("key_0", int64()),
field("key_1", int64()),
+ field("key_0", int64()),
field("hash_count", int64()),
field("hash_sum", float64()),
field("hash_min_max", struct_({
@@ -4840,16 +4840,16 @@ Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentKeyOutput() {
})),
}),
{R"([
- [1, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ],
- [2, 1, 3, -0.125, {"min": -0.25, "max": 0.125}],
- [3, 1, 0, null, {"min": null, "max": null} ],
- [null, 1, 2, 4.75, {"min": 0.75, "max": 4.0} ]
+ [1, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ],
+ [1, 2, 3, -0.125, {"min": -0.25, "max": 0.125}],
+ [1, 3, 0, null, {"min": null, "max": null} ],
+ [1, null, 2, 4.75, {"min": 0.75, "max": 4.0} ]
])",
R"([
- [1, 0, 2, 4.25, {"min": 1.0, "max": 3.25} ],
- [2, 0, 3, -0.125, {"min": -0.25, "max": 0.125}],
- [3, 0, 0, null, {"min": null, "max": null} ],
- [null, 0, 2, 4.75, {"min": 0.75, "max": 4.0} ]
+ [0, 1, 2, 4.25, {"min": 1.0, "max": 3.25} ],
+ [0, 2, 3, -0.125, {"min": -0.25, "max": 0.125}],
+ [0, 3, 0, null, {"min": null, "max": null} ],
+ [0, null, 2, 4.75, {"min": 0.75, "max": 4.0} ]
])"});
}
@@ -4906,7 +4906,7 @@ Result<std::shared_ptr<Table>> GetEmptySegmentKeysInputAsCombined() {
Result<std::shared_ptr<Array>> GetEmptySegmentKeyOutput() {
ARROW_ASSIGN_OR_RAISE(auto chunked, GetSingleSegmentKeyOutput());
ARROW_ASSIGN_OR_RAISE(auto table, Table::FromChunkedStructArray(chunked));
- ARROW_ASSIGN_OR_RAISE(auto removed, table->RemoveColumn(1));
+ ARROW_ASSIGN_OR_RAISE(auto removed, table->RemoveColumn(0));
auto sliced = removed->Slice(0, 4);
ARROW_ASSIGN_OR_RAISE(auto batch, sliced->CombineChunksToBatch());
return batch->ToStructArray();
@@ -4927,14 +4927,13 @@ TEST_P(SegmentedKeyGroupBy, EmptySegmentKeyCombined) {
TestEmptySegmentKey(GetParam(), GetEmptySegmentKeysInputAsCombined);
}
-// adds a named copy of the last (single-segment-key) column to the obtained table
+// adds a named copy of the first (single-segment-key) column to the obtained table
Result<std::shared_ptr<Table>> GetMultiSegmentInput(
std::function<Result<std::shared_ptr<Table>>()> get_table,
const std::string& add_name) {
ARROW_ASSIGN_OR_RAISE(auto table, get_table());
- int last = table->num_columns() - 1;
- auto add_field = field(add_name, table->schema()->field(last)->type());
- return table->AddColumn(table->num_columns(), add_field, table->column(last));
+ auto add_field = field(add_name, table->schema()->field(0)->type());
+ return table->AddColumn(table->num_columns(), add_field, table->column(0));
}
Result<std::shared_ptr<Table>> GetMultiSegmentInputAsChunked(
@@ -4947,12 +4946,12 @@ Result<std::shared_ptr<Table>> GetMultiSegmentInputAsCombined(
return GetMultiSegmentInput(GetSingleSegmentInputAsCombined, add_name);
}
-// adds a named copy of the last (single-segment-key) column to the expected output table
+// adds a named copy of the first(single-segment-key) column to the expected output table
Result<std::shared_ptr<ChunkedArray>> GetMultiSegmentKeyOutput(
const std::string& add_name) {
ARROW_ASSIGN_OR_RAISE(auto chunked, GetSingleSegmentKeyOutput());
ARROW_ASSIGN_OR_RAISE(auto table, Table::FromChunkedStructArray(chunked));
- int existing_key_field_idx = 1;
+ int existing_key_field_idx = 0;
auto add_field =
field(add_name, table->schema()->field(existing_key_field_idx)->type());
ARROW_ASSIGN_OR_RAISE(auto added,
diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc
index 3ce2ba2b8c..d6e18c3bb7 100644
--- a/cpp/src/arrow/acero/plan_test.cc
+++ b/cpp/src/arrow/acero/plan_test.cc
@@ -1640,7 +1640,7 @@ TEST(ExecPlanExecution, SegmentedAggregationWithOneSegment) {
DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
auto expected = ExecBatchFromJSON({int32(), int32(), int64(), float64()},
- R"([[1, 1, 6, 2], [2, 1, 6, 2]])");
+ R"([[1, 1, 6, 2], [1, 2, 6, 2]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
{expected});
}
@@ -1670,7 +1670,7 @@ TEST(ExecPlanExecution, SegmentedAggregationWithTwoSegments) {
auto expected = ExecBatchFromJSON(
{int32(), int32(), int64(), float64()},
- R"([[1, 1, 3, 1.5], [2, 1, 1, 1], [1, 2, 3, 3], [2, 2, 5, 2.5]])");
+ R"([[1, 1, 3, 1.5], [1, 2, 1, 1], [2, 1, 3, 3], [2, 2, 5, 2.5]])");
AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
{expected});
}
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc
index ca96b2bf7c..b84bde8080 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -5726,7 +5726,7 @@ TEST(Substrait, PlanWithSegmentedAggregateExtension) {
std::shared_ptr<Schema> output_schema =
schema({field("k", int32()), field("t", int32()), field("v", float64())});
auto expected_table =
- TableFromJSON(output_schema, {"[[1, 1, 4], [2, 1, 2], [2, 2, 10], [1, 2, 5]]"});
+ TableFromJSON(output_schema, {"[[1, 1, 4], [1, 2, 2], [2, 2, 10], [2, 1, 5]]"});
CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
}