You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ic...@apache.org on 2023/04/14 15:57:04 UTC

[arrow] branch main updated: GH-34930: [C++] Standardize aggregation column order (#34931)

This is an automated email from the ASF dual-hosted git repository.

icexelloss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c73973a23 GH-34930:  [C++] Standardize aggregation column order (#34931)
5c73973a23 is described below

commit 5c73973a23deb68bef75be8acd743763307ac12f
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Fri Apr 14 18:56:57 2023 +0300

    GH-34930:  [C++] Standardize aggregation column order (#34931)
    
    See #34930, #34908
    * Closes: #34930
    * Closes: #34908
    
    Authored-by: Yaron Gvili <rt...@hotmail.com>
    Signed-off-by: Li Jin <ic...@gmail.com>
---
 cpp/src/arrow/acero/aggregate_node.cc        | 25 ++++++++++----------
 cpp/src/arrow/acero/hash_aggregate_test.cc   | 35 ++++++++++++++--------------
 cpp/src/arrow/acero/plan_test.cc             |  4 ++--
 cpp/src/arrow/engine/substrait/serde_test.cc |  2 +-
 4 files changed, 33 insertions(+), 33 deletions(-)

diff --git a/cpp/src/arrow/acero/aggregate_node.cc b/cpp/src/arrow/acero/aggregate_node.cc
index c5b4442544..0366809b93 100644
--- a/cpp/src/arrow/acero/aggregate_node.cc
+++ b/cpp/src/arrow/acero/aggregate_node.cc
@@ -666,19 +666,19 @@ class GroupByNode : public ExecNode, public TracedNode {
     // Build field vector for output schema
     FieldVector output_fields{keys.size() + segment_keys.size() + aggs.size()};
 
-    // First output is keys, followed by segment_keys, followed by aggregates themselves
+    // First output is segment keys, followed by keys, followed by aggregates themselves
     // This matches the behavior described by Substrait and also tends to be the behavior
     // in SQL engines
-    for (size_t i = 0; i < keys.size(); ++i) {
-      int key_field_id = key_field_ids[i];
-      output_fields[i] = input_schema->field(key_field_id);
-    }
-    size_t base = keys.size();
     for (size_t i = 0; i < segment_keys.size(); ++i) {
       int segment_key_field_id = segment_key_field_ids[i];
-      output_fields[base + i] = input_schema->field(segment_key_field_id);
+      output_fields[i] = input_schema->field(segment_key_field_id);
+    }
+    size_t base = segment_keys.size();
+    for (size_t i = 0; i < keys.size(); ++i) {
+      int key_field_id = key_field_ids[i];
+      output_fields[base + i] = input_schema->field(key_field_id);
     }
-    base += segment_keys.size();
+    base += keys.size();
     for (size_t i = 0; i < aggs.size(); ++i) {
       output_fields[base + i] = agg_result_fields[i]->WithName(aggs[i].name);
     }
@@ -829,11 +829,12 @@ class GroupByNode : public ExecNode, public TracedNode {
     out_data.values.resize(agg_kernels_.size() + key_field_ids_.size() +
                            segment_key_field_ids_.size());
 
-    // Keys come first
-    ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
-    std::move(out_keys.values.begin(), out_keys.values.end(), out_data.values.begin());
+    // Segment keys come first
+    PlaceFields(out_data, 0, segmenter_values_);
     // Followed by segment keys
-    PlaceFields(out_data, key_field_ids_.size(), segmenter_values_);
+    ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
+    std::move(out_keys.values.begin(), out_keys.values.end(),
+              out_data.values.begin() + segment_key_field_ids_.size());
     // And finally, the aggregates themselves
     std::size_t base = segment_key_field_ids_.size() + key_field_ids_.size();
     for (size_t i = 0; i < agg_kernels_.size(); ++i) {
diff --git a/cpp/src/arrow/acero/hash_aggregate_test.cc b/cpp/src/arrow/acero/hash_aggregate_test.cc
index ba8b6f4653..3233441852 100644
--- a/cpp/src/arrow/acero/hash_aggregate_test.cc
+++ b/cpp/src/arrow/acero/hash_aggregate_test.cc
@@ -4776,8 +4776,8 @@ void TestSegmentKey(GroupByFunction group_by, const std::shared_ptr<Table>& tabl
 }
 
 Result<std::shared_ptr<Table>> GetSingleSegmentInputAsChunked() {
-  auto table = TableFromJSON(schema({field("argument", float64()), field("key", int64()),
-                                     field("segment_key", int64())}),
+  auto table = TableFromJSON(schema({field("segment_key", int64()), field("key", int64()),
+                                     field("argument", float64())}),
                              {R"([{"argument": 1.0,   "key": 1,    "segment_key": 1},
                          {"argument": null,  "key": 1,    "segment_key": 1}
                         ])",
@@ -4830,8 +4830,8 @@ Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentScalarOutput() {
 
 Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentKeyOutput() {
   return ChunkedArrayFromJSON(struct_({
-                                  field("key_0", int64()),
                                   field("key_1", int64()),
+                                  field("key_0", int64()),
                                   field("hash_count", int64()),
                                   field("hash_sum", float64()),
                                   field("hash_min_max", struct_({
@@ -4840,16 +4840,16 @@ Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentKeyOutput() {
                                                         })),
                               }),
                               {R"([
-    [1,    1, 2, 4.25,   {"min": 1.0,   "max": 3.25} ],
-    [2,    1, 3, -0.125, {"min": -0.25, "max": 0.125}],
-    [3,    1, 0, null,   {"min": null,  "max": null} ],
-    [null, 1, 2, 4.75,   {"min": 0.75,  "max": 4.0}  ]
+    [1, 1,    2, 4.25,   {"min": 1.0,   "max": 3.25} ],
+    [1, 2,    3, -0.125, {"min": -0.25, "max": 0.125}],
+    [1, 3,    0, null,   {"min": null,  "max": null} ],
+    [1, null, 2, 4.75,   {"min": 0.75,  "max": 4.0}  ]
   ])",
                                R"([
-    [1,    0, 2, 4.25,   {"min": 1.0,   "max": 3.25} ],
-    [2,    0, 3, -0.125, {"min": -0.25, "max": 0.125}],
-    [3,    0, 0, null,   {"min": null,  "max": null} ],
-    [null, 0, 2, 4.75,   {"min": 0.75,  "max": 4.0}  ]
+    [0, 1,    2, 4.25,   {"min": 1.0,   "max": 3.25} ],
+    [0, 2,    3, -0.125, {"min": -0.25, "max": 0.125}],
+    [0, 3,    0, null,   {"min": null,  "max": null} ],
+    [0, null, 2, 4.75,   {"min": 0.75,  "max": 4.0}  ]
   ])"});
 }
 
@@ -4906,7 +4906,7 @@ Result<std::shared_ptr<Table>> GetEmptySegmentKeysInputAsCombined() {
 Result<std::shared_ptr<Array>> GetEmptySegmentKeyOutput() {
   ARROW_ASSIGN_OR_RAISE(auto chunked, GetSingleSegmentKeyOutput());
   ARROW_ASSIGN_OR_RAISE(auto table, Table::FromChunkedStructArray(chunked));
-  ARROW_ASSIGN_OR_RAISE(auto removed, table->RemoveColumn(1));
+  ARROW_ASSIGN_OR_RAISE(auto removed, table->RemoveColumn(0));
   auto sliced = removed->Slice(0, 4);
   ARROW_ASSIGN_OR_RAISE(auto batch, sliced->CombineChunksToBatch());
   return batch->ToStructArray();
@@ -4927,14 +4927,13 @@ TEST_P(SegmentedKeyGroupBy, EmptySegmentKeyCombined) {
   TestEmptySegmentKey(GetParam(), GetEmptySegmentKeysInputAsCombined);
 }
 
-// adds a named copy of the last (single-segment-key) column to the obtained table
+// adds a named copy of the first (single-segment-key) column to the obtained table
 Result<std::shared_ptr<Table>> GetMultiSegmentInput(
     std::function<Result<std::shared_ptr<Table>>()> get_table,
     const std::string& add_name) {
   ARROW_ASSIGN_OR_RAISE(auto table, get_table());
-  int last = table->num_columns() - 1;
-  auto add_field = field(add_name, table->schema()->field(last)->type());
-  return table->AddColumn(table->num_columns(), add_field, table->column(last));
+  auto add_field = field(add_name, table->schema()->field(0)->type());
+  return table->AddColumn(table->num_columns(), add_field, table->column(0));
 }
 
 Result<std::shared_ptr<Table>> GetMultiSegmentInputAsChunked(
@@ -4947,12 +4946,12 @@ Result<std::shared_ptr<Table>> GetMultiSegmentInputAsCombined(
   return GetMultiSegmentInput(GetSingleSegmentInputAsCombined, add_name);
 }
 
-// adds a named copy of the last (single-segment-key) column to the expected output table
+// adds a named copy of the first(single-segment-key) column to the expected output table
 Result<std::shared_ptr<ChunkedArray>> GetMultiSegmentKeyOutput(
     const std::string& add_name) {
   ARROW_ASSIGN_OR_RAISE(auto chunked, GetSingleSegmentKeyOutput());
   ARROW_ASSIGN_OR_RAISE(auto table, Table::FromChunkedStructArray(chunked));
-  int existing_key_field_idx = 1;
+  int existing_key_field_idx = 0;
   auto add_field =
       field(add_name, table->schema()->field(existing_key_field_idx)->type());
   ARROW_ASSIGN_OR_RAISE(auto added,
diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc
index 3ce2ba2b8c..d6e18c3bb7 100644
--- a/cpp/src/arrow/acero/plan_test.cc
+++ b/cpp/src/arrow/acero/plan_test.cc
@@ -1640,7 +1640,7 @@ TEST(ExecPlanExecution, SegmentedAggregationWithOneSegment) {
                        DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
 
   auto expected = ExecBatchFromJSON({int32(), int32(), int64(), float64()},
-                                    R"([[1, 1, 6, 2], [2, 1, 6, 2]])");
+                                    R"([[1, 1, 6, 2], [1, 2, 6, 2]])");
   AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
                                       {expected});
 }
@@ -1670,7 +1670,7 @@ TEST(ExecPlanExecution, SegmentedAggregationWithTwoSegments) {
 
   auto expected = ExecBatchFromJSON(
       {int32(), int32(), int64(), float64()},
-      R"([[1, 1, 3, 1.5], [2, 1, 1, 1], [1, 2, 3, 3], [2, 2, 5, 2.5]])");
+      R"([[1, 1, 3, 1.5], [1, 2, 1, 1], [2, 1, 3, 3], [2, 2, 5, 2.5]])");
   AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
                                       {expected});
 }
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc
index ca96b2bf7c..b84bde8080 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -5726,7 +5726,7 @@ TEST(Substrait, PlanWithSegmentedAggregateExtension) {
   std::shared_ptr<Schema> output_schema =
       schema({field("k", int32()), field("t", int32()), field("v", float64())});
   auto expected_table =
-      TableFromJSON(output_schema, {"[[1, 1, 4], [2, 1, 2], [2, 2, 10], [1, 2, 5]]"});
+      TableFromJSON(output_schema, {"[[1, 1, 4], [1, 2, 2], [2, 2, 10], [2, 1, 5]]"});
   CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
 }