You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/04/05 06:29:36 UTC

[arrow] branch main updated: GH-33616: [C++] Reorder group_by so that keys/segment keys come before aggregates (#34551)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 379c1fb03a GH-33616: [C++] Reorder group_by so that keys/segment keys come before aggregates (#34551)
379c1fb03a is described below

commit 379c1fb03a78c108846516987b3b2583ef650cb8
Author: Weston Pace <we...@gmail.com>
AuthorDate: Tue Apr 4 23:29:29 2023 -0700

    GH-33616: [C++] Reorder group_by so that keys/segment keys come before aggregates (#34551)
    
    
    * Closes: #33616
    
    Lead-authored-by: Weston Pace <we...@gmail.com>
    Co-authored-by: Neal Richardson <ne...@gmail.com>
    Co-authored-by: Sutou Kouhei <ko...@clear-code.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 c_glib/test/test-execute-plan.rb                |   6 +-
 cpp/src/arrow/acero/aggregate_node.cc           |  59 ++-
 cpp/src/arrow/acero/groupby_test.cc             |  28 +-
 cpp/src/arrow/acero/hash_aggregate_test.cc      | 574 ++++++++++++------------
 cpp/src/arrow/acero/plan_test.cc                |  47 +-
 cpp/src/arrow/dataset/scanner_test.cc           |  10 +-
 cpp/src/arrow/engine/substrait/function_test.cc |   8 +-
 cpp/src/arrow/engine/substrait/serde_test.cc    |  18 +-
 python/pyarrow/table.pxi                        |  24 +-
 python/pyarrow/tests/test_acero.py              |   4 +-
 r/R/dplyr-collect.R                             |   4 +-
 r/R/query-engine.R                              |  19 +-
 r/tests/testthat/test-dataset-dplyr.R           |   1 -
 r/tests/testthat/test-dplyr-query.R             |   1 -
 r/tests/testthat/test-dplyr-summarize.R         |   6 +-
 ruby/red-arrow/test/test-group.rb               | 116 ++---
 16 files changed, 462 insertions(+), 463 deletions(-)

diff --git a/c_glib/test/test-execute-plan.rb b/c_glib/test/test-execute-plan.rb
index aeb9f90587..da1433db55 100644
--- a/c_glib/test/test-execute-plan.rb
+++ b/c_glib/test/test-execute-plan.rb
@@ -57,9 +57,9 @@ class TestExecutePlan < Test::Unit::TestCase
         Arrow::AggregateNodeOptions.new(aggregations, ["string"])
       end
       execute(plan) do
-        assert_equal(build_table("sum(number)" => build_int64_array([9, 6]),
-                                 "count(number)" => build_int64_array([3, 2]),
-                                 "string" => build_string_array(["a", "b"])),
+        assert_equal(build_table("string" => build_string_array(["a", "b"]),
+                                 "sum(number)" => build_int64_array([9, 6]),
+                                 "count(number)" => build_int64_array([3, 2])),
                      reader.read_all)
       end
     end
diff --git a/cpp/src/arrow/acero/aggregate_node.cc b/cpp/src/arrow/acero/aggregate_node.cc
index 6669d30bcc..bd97235df6 100644
--- a/cpp/src/arrow/acero/aggregate_node.cc
+++ b/cpp/src/arrow/acero/aggregate_node.cc
@@ -314,8 +314,15 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
     std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
     std::vector<std::vector<std::unique_ptr<KernelState>>> states(kernels.size());
     FieldVector fields(kernels.size() + segment_keys.size());
-    std::vector<std::vector<int>> target_fieldsets(kernels.size());
 
+    // Output the segment keys first, followed by the aggregates
+    for (size_t i = 0; i < segment_keys.size(); ++i) {
+      ARROW_ASSIGN_OR_RAISE(fields[i],
+                            segment_keys[i].GetOne(*inputs[0]->output_schema()));
+    }
+
+    std::vector<std::vector<int>> target_fieldsets(kernels.size());
+    std::size_t base = segment_keys.size();
     for (size_t i = 0; i < kernels.size(); ++i) {
       const auto& target_fieldset = aggregate_options.aggregates[i].target;
       for (const auto& target : target_fieldset) {
@@ -366,11 +373,8 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
       ARROW_ASSIGN_OR_RAISE(auto out_type, kernels[i]->signature->out_type().Resolve(
                                                &kernel_ctx, kernel_intypes[i]));
 
-      fields[i] = field(aggregate_options.aggregates[i].name, out_type.GetSharedPtr());
-    }
-    for (size_t i = 0; i < segment_keys.size(); ++i) {
-      ARROW_ASSIGN_OR_RAISE(fields[kernels.size() + i],
-                            segment_keys[i].GetOne(*inputs[0]->output_schema()));
+      fields[base + i] =
+          field(aggregate_options.aggregates[i].name, out_type.GetSharedPtr());
     }
 
     return plan->EmplaceNode<ScalarAggregateNode>(
@@ -485,6 +489,11 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
     ExecBatch batch{{}, 1};
     batch.values.resize(kernels_.size() + segment_field_ids_.size());
 
+    // First, insert segment keys
+    PlaceFields(batch, /*base=*/0, segmenter_values_);
+
+    // Followed by aggregate values
+    std::size_t base = segment_field_ids_.size();
     for (size_t i = 0; i < kernels_.size(); ++i) {
       arrow::util::tracing::Span span;
       START_COMPUTE_SPAN(span, aggs_[i].function,
@@ -495,9 +504,8 @@ class ScalarAggregateNode : public ExecNode, public TracedNode {
       KernelContext ctx{plan()->query_context()->exec_context()};
       ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
                                              kernels_[i], &ctx, std::move(states_[i])));
-      RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
+      RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[base + i]));
     }
-    PlaceFields(batch, kernels_.size(), segmenter_values_);
 
     ARROW_RETURN_NOT_OK(output_->InputReceived(this, std::move(batch)));
     total_output_batches_++;
@@ -643,21 +651,23 @@ class GroupByNode : public ExecNode, public TracedNode {
     // Build field vector for output schema
     FieldVector output_fields{keys.size() + segment_keys.size() + aggs.size()};
 
-    // Aggregate fields come before key fields to match the behavior of GroupBy function
-    for (size_t i = 0; i < aggs.size(); ++i) {
-      output_fields[i] =
-          agg_result_fields[i]->WithName(aggregate_options.aggregates[i].name);
-    }
-    size_t base = aggs.size();
+    // First output is keys, followed by segment_keys, followed by aggregates themselves
+    // This matches the behavior described by Substrait and also tends to be the behavior
+    // in SQL engines
     for (size_t i = 0; i < keys.size(); ++i) {
       int key_field_id = key_field_ids[i];
-      output_fields[base + i] = input_schema->field(key_field_id);
+      output_fields[i] = input_schema->field(key_field_id);
     }
-    base += keys.size();
+    size_t base = keys.size();
     for (size_t i = 0; i < segment_keys.size(); ++i) {
       int segment_key_field_id = segment_key_field_ids[i];
       output_fields[base + i] = input_schema->field(segment_key_field_id);
     }
+    base += segment_keys.size();
+    for (size_t i = 0; i < aggs.size(); ++i) {
+      output_fields[base + i] =
+          agg_result_fields[i]->WithName(aggregate_options.aggregates[i].name);
+    }
 
     return input->plan()->EmplaceNode<GroupByNode>(
         input, schema(std::move(output_fields)), std::move(key_field_ids),
@@ -766,11 +776,18 @@ class GroupByNode : public ExecNode, public TracedNode {
     // If we never got any batches, then state won't have been initialized
     RETURN_NOT_OK(InitLocalStateIfNeeded(state));
 
+    // Allocate a batch for output
     ExecBatch out_data{{}, state->grouper->num_groups()};
     out_data.values.resize(agg_kernels_.size() + key_field_ids_.size() +
                            segment_key_field_ids_.size());
 
-    // Aggregate fields come before key fields to match the behavior of GroupBy function
+    // Keys come first
+    ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
+    std::move(out_keys.values.begin(), out_keys.values.end(), out_data.values.begin());
+    // Followed by segment keys
+    PlaceFields(out_data, key_field_ids_.size(), segmenter_values_);
+    // And finally, the aggregates themselves
+    std::size_t base = segment_key_field_ids_.size() + key_field_ids_.size();
     for (size_t i = 0; i < agg_kernels_.size(); ++i) {
       arrow::util::tracing::Span span;
       START_COMPUTE_SPAN(span, aggs_[i].function,
@@ -780,15 +797,11 @@ class GroupByNode : public ExecNode, public TracedNode {
                           {"function.kind", std::string(kind_name()) + "::Finalize"}});
       KernelContext batch_ctx{plan_->query_context()->exec_context()};
       batch_ctx.SetState(state->agg_states[i].get());
-      RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i]));
+      RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i + base]));
       state->agg_states[i].reset();
     }
-
-    ARROW_ASSIGN_OR_RAISE(ExecBatch out_keys, state->grouper->GetUniques());
-    std::move(out_keys.values.begin(), out_keys.values.end(),
-              out_data.values.begin() + agg_kernels_.size());
-    PlaceFields(out_data, agg_kernels_.size() + key_field_ids_.size(), segmenter_values_);
     state->grouper.reset();
+
     return out_data;
   }
 
diff --git a/cpp/src/arrow/acero/groupby_test.cc b/cpp/src/arrow/acero/groupby_test.cc
index 1284dbae2b..5710ad2598 100644
--- a/cpp/src/arrow/acero/groupby_test.cc
+++ b/cpp/src/arrow/acero/groupby_test.cc
@@ -39,13 +39,15 @@ TEST(GroupByConvenienceFunc, Basic) {
   ])"});
 
   // One key, two aggregates, same values array
-  std::shared_ptr<Table> expected =
-      TableFromJSON(schema({field("value_sum", int64()), field("value_count", int64()),
-                            field("key1", utf8())}),
-                    {R"([
-        [1, 1, "x"],
-        [5, 2, "y"],
-        [9, 2, "z"]
+  std::shared_ptr<Table> expected = TableFromJSON(schema({
+                                                      field("key1", utf8()),
+                                                      field("value_sum", int64()),
+                                                      field("value_count", int64()),
+                                                  }),
+                                                  {R"([
+        ["x", 1, 1],
+        ["y", 5, 2],
+        ["z", 9, 2]
     ])"});
   ASSERT_OK_AND_ASSIGN(std::shared_ptr<Table> actual,
                        TableGroupBy(in_table,
@@ -55,14 +57,14 @@ TEST(GroupByConvenienceFunc, Basic) {
   AssertTablesEqual(*expected, *actual);
 
   // Two keys, one aggregate
-  expected = TableFromJSON(schema({field("value_sum", int64()), field("key1", utf8()),
-                                   field("key2", int32())}),
+  expected = TableFromJSON(schema({field("key1", utf8()), field("key2", int32()),
+                                   field("value_sum", int64())}),
                            {
                                R"([
-        [1, "x", 1],
-        [2, "y", 1],
-        [3, "y", 2],
-        [9, "z", 2]
+        ["x", 1, 1],
+        ["y", 1, 2],
+        ["y", 2, 3],
+        ["z", 2, 9]
       ])"});
 
   ASSERT_OK_AND_ASSIGN(actual,
diff --git a/cpp/src/arrow/acero/hash_aggregate_test.cc b/cpp/src/arrow/acero/hash_aggregate_test.cc
index 68059dacbf..0ae06d0572 100644
--- a/cpp/src/arrow/acero/hash_aggregate_test.cc
+++ b/cpp/src/arrow/acero/hash_aggregate_test.cc
@@ -107,6 +107,17 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments, std::vector<Datum> keys
   ArrayVector out_columns;
   std::vector<std::string> out_names;
 
+  int key_idx = 0;
+  ARROW_ASSIGN_OR_RAISE(auto uniques, grouper->GetUniques());
+  std::vector<SortKey> sort_keys;
+  std::vector<std::shared_ptr<Field>> sort_table_fields;
+  for (const Datum& key : uniques.values) {
+    out_columns.push_back(key.make_array());
+    sort_keys.emplace_back(FieldRef(key_idx));
+    sort_table_fields.push_back(field("key_" + ToChars(key_idx), key.type()));
+    out_names.push_back("key_" + ToChars(key_idx++));
+  }
+
   for (size_t i = 0; i < arguments.size(); ++i) {
     out_names.push_back(aggregates[i].function);
 
@@ -132,17 +143,6 @@ Result<Datum> NaiveGroupBy(std::vector<Datum> arguments, std::vector<Datum> keys
     out_columns.push_back(aggregated_column.make_array());
   }
 
-  int i = 0;
-  ARROW_ASSIGN_OR_RAISE(auto uniques, grouper->GetUniques());
-  std::vector<SortKey> sort_keys;
-  std::vector<std::shared_ptr<Field>> sort_table_fields;
-  for (const Datum& key : uniques.values) {
-    out_columns.push_back(key.make_array());
-    sort_keys.emplace_back(FieldRef(i));
-    sort_table_fields.push_back(field("key_" + ToChars(i), key.type()));
-    out_names.push_back("key_" + ToChars(i++));
-  }
-
   // Return a struct array sorted by the keys
   SortOptions sort_options(std::move(sort_keys));
   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> sort_batch,
@@ -179,7 +179,7 @@ Result<Datum> MakeGroupByOutput(const std::vector<ExecBatch>& output_batches,
       StructArray::Make(std::move(out_arrays), output_schema->fields()));
 
   bool need_sort = !naive;
-  for (size_t i = num_aggregates; need_sort && i < out_arrays.size(); i++) {
+  for (size_t i = 0; need_sort && i < num_keys; i++) {
     if (output_schema->field(static_cast<int>(i))->type()->id() == Type::DICTIONARY) {
       need_sort = false;
     }
@@ -196,7 +196,7 @@ Result<Datum> MakeGroupByOutput(const std::vector<ExecBatch>& output_batches,
   std::vector<std::shared_ptr<Array>> key_columns;
   std::vector<SortKey> sort_keys;
   for (std::size_t i = 0; i < num_keys; i++) {
-    const std::shared_ptr<Array>& arr = out_arrays[i + num_aggregates];
+    const std::shared_ptr<Array>& arr = out_arrays[i];
     key_columns.push_back(arr);
     key_fields.push_back(field("name_does_not_matter", arr->type()));
     sort_keys.emplace_back(static_cast<int>(i));
@@ -206,7 +206,6 @@ Result<Datum> MakeGroupByOutput(const std::vector<ExecBatch>& output_batches,
   SortOptions sort_options(std::move(sort_keys));
   ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Array> sort_indices,
                         SortIndices(key_table, sort_options));
-
   return Take(struct_arr, sort_indices);
 }
 
@@ -1254,8 +1253,8 @@ TEST_P(GroupBy, NoBatches) {
                   },
                   /*use_threads=*/true));
   AssertDatumsEqual(ArrayFromJSON(struct_({
-                                      field("hash_count", int64()),
                                       field("key_0", int64()),
+                                      field("hash_count", int64()),
                                   }),
                                   R"([])"),
                     aggregated_and_grouped, /*verbose=*/true);
@@ -1319,18 +1318,16 @@ TEST_P(GroupBy, CountOnly) {
                     use_threads));
     SortBy({"key_0"}, &aggregated_and_grouped);
 
-    AssertDatumsEqual(ArrayFromJSON(struct_({
-                                        field("hash_count", int64()),
-                                        field("key_0", int64()),
-                                    }),
-                                    R"([
-    [2,   1],
-    [3,   2],
-    [0,   3],
-    [2,   null]
+    AssertDatumsEqual(
+        ArrayFromJSON(struct_({field("key_0", int64()), field("hash_count", int64())}),
+                      R"([
+    [1, 2],
+    [2, 3],
+    [3, 0],
+    [null, 2]
   ])"),
-                      aggregated_and_grouped,
-                      /*verbose=*/true);
+        aggregated_and_grouped,
+        /*verbose=*/true);
   }
 }
 
@@ -1360,15 +1357,15 @@ TEST_P(GroupBy, CountScalar) {
                                  use_threads));
 
     Datum expected = ArrayFromJSON(struct_({
+                                       field("key", int64()),
                                        field("hash_count", int64()),
                                        field("hash_count", int64()),
                                        field("hash_count", int64()),
-                                       field("key", int64()),
                                    }),
                                    R"([
-      [3, 2, 5, 1],
-      [2, 1, 3, 2],
-      [2, 1, 3, 3]
+      [1, 3, 2, 5],
+      [2, 2, 1, 3],
+      [3, 2, 1, 3]
     ])");
     AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
   }
@@ -1406,14 +1403,14 @@ TEST_P(GroupBy, SumOnly) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
-                                        field("hash_sum", float64()),
                                         field("key_0", int64()),
+                                        field("hash_sum", float64()),
                                     }),
                                     R"([
-    [4.25,   1],
-    [-0.125, 2],
-    [null,   3],
-    [4.75,   null]
+    [1, 4.25],
+    [2, -0.125],
+    [3, null],
+    [null, 4.75]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -1474,20 +1471,20 @@ TEST_P(GroupBy, SumMeanProductDecimal) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", int64()),
                                         field("hash_sum", decimal128(3, 2)),
                                         field("hash_sum", decimal256(3, 2)),
                                         field("hash_mean", decimal128(3, 2)),
                                         field("hash_mean", decimal256(3, 2)),
                                         field("hash_product", decimal128(3, 2)),
                                         field("hash_product", decimal256(3, 2)),
-                                        field("key_0", int64()),
                                     }),
                                     R"([
-    ["4.25",  "4.25",  "2.13",  "2.13",  "3.25", "3.25", 1],
-    ["-0.13", "-0.13", "-0.04", "-0.04", "0.00", "0.00", 2],
-    [null,    null,    null,    null,    null,   null,   3],
-    ["4.05",  "4.05",  "1.01",  "1.01",  "1.05", "1.05", 4],
-    ["4.75",  "4.75",  "2.38",  "2.38",  "3.00", "3.00", null]
+    [1, "4.25",  "4.25",  "2.13",  "2.13",  "3.25", "3.25"],
+    [2, "-0.13", "-0.13", "-0.04", "-0.04", "0.00", "0.00"],
+    [3, null,    null,    null,    null,    null,   null],
+    [4, "4.05",  "4.05",  "1.01",  "1.01",  "1.05", "1.05"],
+    [null, "4.75",  "4.75",  "2.38",  "2.38",  "3.00", "3.00"]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -1530,15 +1527,15 @@ TEST_P(GroupBy, MeanOnly) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                              field("key_0", int64()),
                                               field("hash_mean", float64()),
                                               field("hash_mean", float64()),
-                                              field("key_0", int64()),
                                           }),
                                           R"([
-    [2.125,                 null,                  1],
-    [-0.041666666666666664, -0.041666666666666664, 2],
-    [null,                  null,                  3],
-    [2.375,                 null,                  null]
+    [1,    2.125,                 null                 ],
+    [2,    -0.041666666666666664, -0.041666666666666664],
+    [3,    null,                  null                 ],
+    [null, 2.375,                 null                 ]
   ])"),
                             aggregated_and_grouped,
                             /*verbose=*/true);
@@ -1569,15 +1566,15 @@ TEST_P(GroupBy, SumMeanProductScalar) {
                    },
                    use_threads));
     Datum expected = ArrayFromJSON(struct_({
+                                       field("key", int64()),
                                        field("hash_sum", int64()),
                                        field("hash_mean", float64()),
                                        field("hash_product", int64()),
-                                       field("key", int64()),
                                    }),
                                    R"([
-      [4, 1.333333, 2, 1],
-      [4, 2,        3, 2],
-      [5, 2.5,      4, 3]
+      [1, 4, 1.333333, 2],
+      [2, 4, 2,        3],
+      [3, 5, 2.5,      4]
     ])");
     AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
   }
@@ -1615,15 +1612,15 @@ TEST_P(GroupBy, VarianceAndStddev) {
                            false));
 
   AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                            field("key_0", int64()),
                                             field("hash_variance", float64()),
                                             field("hash_stddev", float64()),
-                                            field("key_0", int64()),
                                         }),
                                         R"([
-    [1.0,                 1.0,                1],
-    [0.22222222222222224, 0.4714045207910317, 2],
-    [null,                null,               3],
-    [2.25,                1.5,                null]
+    [1,    1.0,                 1.0               ],
+    [2,    0.22222222222222224, 0.4714045207910317],
+    [3,    null,                null              ],
+    [null, 2.25,                1.5               ]
   ])"),
                           aggregated_and_grouped,
                           /*verbose=*/true);
@@ -1658,15 +1655,15 @@ TEST_P(GroupBy, VarianceAndStddev) {
                                                    false));
 
   AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                            field("key_0", int64()),
                                             field("hash_variance", float64()),
                                             field("hash_stddev", float64()),
-                                            field("key_0", int64()),
                                         }),
                                         R"([
-    [1.0,                 1.0,                1],
-    [0.22222222222222224, 0.4714045207910317, 2],
-    [null,                null,               3],
-    [2.25,                1.5,                null]
+    [1,    1.0,                 1.0               ],
+    [2,    0.22222222222222224, 0.4714045207910317],
+    [3,    null,                null              ],
+    [null, 2.25,                1.5               ]
   ])"),
                           aggregated_and_grouped,
                           /*verbose=*/true);
@@ -1690,15 +1687,15 @@ TEST_P(GroupBy, VarianceAndStddev) {
                            false));
 
   AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                            field("key_0", int64()),
                                             field("hash_variance", float64()),
                                             field("hash_stddev", float64()),
-                                            field("key_0", int64()),
                                         }),
                                         R"([
-    [null,                null,               1],
-    [0.6666666666666667,  0.816496580927726,  2],
-    [null,                null,               3],
-    [null,                null,               null]
+    [1,    null,                null             ],
+    [2,    0.6666666666666667,  0.816496580927726],
+    [3,    null,                null             ],
+    [null, null,                null             ]
   ])"),
                           aggregated_and_grouped,
                           /*verbose=*/true);
@@ -1740,16 +1737,16 @@ TEST_P(GroupBy, VarianceAndStddevDecimal) {
                            false));
 
   AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                            field("key_0", int64()),
                                             field("hash_variance", float64()),
                                             field("hash_stddev", float64()),
                                             field("hash_variance", float64()),
                                             field("hash_stddev", float64()),
-                                            field("key_0", int64()),
                                         }),
                                         R"([
-    [1.0,                 1.0,                1.0,                 1.0,                1],
-    [0.22222222222222224, 0.4714045207910317, 0.22222222222222224, 0.4714045207910317, 2],
-    [2.25,                1.5,                2.25,                1.5,                null]
+    [1,    1.0,                 1.0,                1.0,                 1.0               ],
+    [2,    0.22222222222222224, 0.4714045207910317, 0.22222222222222224, 0.4714045207910317],
+    [null, 2.25,                1.5,                2.25,                1.5               ]
   ])"),
                           aggregated_and_grouped,
                           /*verbose=*/true);
@@ -1813,20 +1810,20 @@ TEST_P(GroupBy, TDigest) {
 
   AssertDatumsApproxEqual(
       ArrayFromJSON(struct_({
+                        field("key_0", int64()),
                         field("hash_tdigest", fixed_size_list(float64(), 1)),
                         field("hash_tdigest", fixed_size_list(float64(), 3)),
                         field("hash_tdigest", fixed_size_list(float64(), 3)),
                         field("hash_tdigest", fixed_size_list(float64(), 1)),
                         field("hash_tdigest", fixed_size_list(float64(), 1)),
                         field("hash_tdigest", fixed_size_list(float64(), 1)),
-                        field("key_0", int64()),
                     }),
                     R"([
-    [[1.0],  [1.0, 3.0, 3.0],    [1.0, 3.0, 3.0],    [null], [null], [null], 1],
-    [[0.0],  [0.0, 0.0, 0.0],    [0.0, 0.0, 0.0],    [0.0],  [0.0],  [0.0],  2],
-    [[null], [null, null, null], [null, null, null], [null], [null], [null], 3],
-    [[1.0],  [1.0, 1.0, 1.0],    [1.0, 1.0, 1.0],    [null], [1.0],  [null], 4],
-    [[1.0],  [1.0, 4.0, 4.0],    [1.0, 4.0, 4.0],    [1.0],  [null], [null], null]
+    [1,    [1.0],  [1.0, 3.0, 3.0],    [1.0, 3.0, 3.0],    [null], [null], [null]],
+    [2,    [0.0],  [0.0, 0.0, 0.0],    [0.0, 0.0, 0.0],    [0.0],  [0.0],  [0.0] ],
+    [3,    [null], [null, null, null], [null, null, null], [null], [null], [null]],
+    [4,    [1.0],  [1.0, 1.0, 1.0],    [1.0, 1.0, 1.0],    [null], [1.0],  [null]],
+    [null, [1.0],  [1.0, 4.0, 4.0],    [1.0, 4.0, 4.0],    [1.0],  [null], [null]]
   ])"),
       aggregated_and_grouped,
       /*verbose=*/true);
@@ -1862,14 +1859,14 @@ TEST_P(GroupBy, TDigestDecimal) {
 
   AssertDatumsApproxEqual(
       ArrayFromJSON(struct_({
+                        field("key_0", int64()),
                         field("hash_tdigest", fixed_size_list(float64(), 1)),
                         field("hash_tdigest", fixed_size_list(float64(), 1)),
-                        field("key_0", int64()),
                     }),
                     R"([
-    [[1.01], [1.01], 1],
-    [[0.0],  [0.0],  2],
-    [[1.85], [1.85], null]
+    [1,    [1.01], [1.01]],
+    [2,    [0.0],  [0.0] ],
+    [null, [1.85], [1.85]]
   ])"),
       aggregated_and_grouped,
       /*verbose=*/true);
@@ -1923,18 +1920,18 @@ TEST_P(GroupBy, ApproximateMedian) {
                              false));
 
     AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                              field("key_0", int64()),
                                               field("hash_approximate_median", float64()),
                                               field("hash_approximate_median", float64()),
                                               field("hash_approximate_median", float64()),
                                               field("hash_approximate_median", float64()),
-                                              field("key_0", int64()),
                                           }),
                                           R"([
-    [1.0,  null, null, null, 1],
-    [0.0,  0.0,  0.0,  0.0,  2],
-    [null, null, null, null, 3],
-    [1.0,  null, 1.0,  null, 4],
-    [1.0,  1.0,  null, null, null]
+    [1,    1.0,  null, null, null],
+    [2,    0.0,  0.0,  0.0,  0.0 ],
+    [3,    null, null, null, null],
+    [4,    1.0,  null, 1.0,  null],
+    [null, 1.0,  1.0,  null, null]
   ])"),
                             aggregated_and_grouped,
                             /*verbose=*/true);
@@ -1973,18 +1970,18 @@ TEST_P(GroupBy, StddevVarianceTDigestScalar) {
                    use_threads));
     Datum expected =
         ArrayFromJSON(struct_({
+                          field("key", int64()),
                           field("hash_stddev", float64()),
                           field("hash_variance", float64()),
                           field("hash_tdigest", fixed_size_list(float64(), 1)),
                           field("hash_stddev", float64()),
                           field("hash_variance", float64()),
                           field("hash_tdigest", fixed_size_list(float64(), 1)),
-                          field("key", int64()),
                       }),
                       R"([
-         [0.4714045, 0.222222, [1.0], 0.4714045, 0.222222, [1.0], 1],
-         [1.0,       1.0,      [1.0], 1.0,       1.0,      [1.0], 2],
-         [1.5,       2.25,     [1.0], 1.5,       2.25,     [1.0], 3]
+         [1, 0.4714045, 0.222222, [1.0], 0.4714045, 0.222222, [1.0]],
+         [2, 1.0,       1.0,      [1.0], 1.0,       1.0,      [1.0]],
+         [3, 1.5,       2.25,     [1.0], 1.5,       2.25,     [1.0]]
        ])");
     AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
   }
@@ -2034,19 +2031,19 @@ TEST_P(GroupBy, VarianceOptions) {
             },
             use_threads));
     Datum expected = ArrayFromJSON(struct_({
+                                       field("key", int64()),
                                        field("hash_stddev", float64()),
                                        field("hash_stddev", float64()),
                                        field("hash_stddev", float64()),
                                        field("hash_variance", float64()),
                                        field("hash_variance", float64()),
                                        field("hash_variance", float64()),
-                                       field("key", int64()),
                                    }),
                                    R"([
-         [null,    0.471405, null,    null,   0.222222, null,   1],
-         [1.29904, 1.29904,  1.29904, 1.6875, 1.6875,   1.6875, 2],
-         [0.0,     null,     null,    0.0,    null,     null,   3],
-         [null,    0.471405, null,    null,   0.222222, null,   4]
+         [1, null,    0.471405, null,    null,   0.222222, null  ],
+         [2, 1.29904, 1.29904,  1.29904, 1.6875, 1.6875,   1.6875],
+         [3, 0.0,     null,     null,    0.0,    null,     null  ],
+         [4, null,    0.471405, null,    null,   0.222222, null  ]
        ])");
     ValidateOutput(expected);
     AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
@@ -2065,19 +2062,19 @@ TEST_P(GroupBy, VarianceOptions) {
             },
             use_threads));
     expected = ArrayFromJSON(struct_({
+                                 field("key", int64()),
                                  field("hash_stddev", float64()),
                                  field("hash_stddev", float64()),
                                  field("hash_stddev", float64()),
                                  field("hash_variance", float64()),
                                  field("hash_variance", float64()),
                                  field("hash_variance", float64()),
-                                 field("key", int64()),
                              }),
                              R"([
-         [null,    0.471405, null,    null,   0.222222, null,   1],
-         [1.29904, 1.29904,  1.29904, 1.6875, 1.6875,   1.6875, 2],
-         [0.0,     null,     null,    0.0,    null,     null,   3],
-         [null,    0.471405, null,    null,   0.222222, null,   4]
+         [1, null,    0.471405, null,    null,   0.222222, null  ],
+         [2, 1.29904, 1.29904,  1.29904, 1.6875, 1.6875,   1.6875],
+         [3, 0.0,     null,     null,    0.0,    null,     null  ],
+         [4, null,    0.471405, null,    null,   0.222222, null  ]
        ])");
     ValidateOutput(expected);
     AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
@@ -2129,6 +2126,7 @@ TEST_P(GroupBy, MinMaxOnly) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", int64()),
                                         field("hash_min_max", struct_({
                                                                   field("min", float64()),
                                                                   field("max", float64()),
@@ -2141,13 +2139,12 @@ TEST_P(GroupBy, MinMaxOnly) {
                                                                   field("min", boolean()),
                                                                   field("max", boolean()),
                                                               })),
-                                        field("key_0", int64()),
                                     }),
                                     R"([
-    [{"min": 1.0,   "max": 3.25},  {"min": null, "max": null}, {"min": true, "max": true},   1],
-    [{"min": -0.25, "max": 0.125}, {"min": null, "max": null}, {"min": false, "max": false}, 2],
-    [{"min": null,  "max": null},  {"min": null, "max": null}, {"min": false, "max": true},  3],
-    [{"min": 0.75,  "max": 4.0},   {"min": null, "max": null}, {"min": true, "max": true},   null]
+    [1, {"min": 1.0,   "max": 3.25},  {"min": null, "max": null}, {"min": true, "max": true}   ],
+    [2, {"min": -0.25, "max": 0.125}, {"min": null, "max": null}, {"min": false, "max": false} ],
+    [3, {"min": null,  "max": null},  {"min": null, "max": null}, {"min": false, "max": true}  ],
+    [null, {"min": 0.75,  "max": 4.0},   {"min": null, "max": null}, {"min": true, "max": true}]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -2200,20 +2197,20 @@ TEST_P(GroupBy, MinMaxTypes) {
 
   const std::string default_expected =
       R"([
-    [{"min": 1, "max": 3},       1],
-    [{"min": 0, "max": 0},       2],
-    [{"min": null, "max": null}, 3],
-    [{"min": 3, "max": 5},       4],
-    [{"min": 1, "max": 4},       null]
+    [1,    {"min": 1, "max": 3}      ],
+    [2,    {"min": 0, "max": 0}      ],
+    [3,    {"min": null, "max": null}],
+    [4,    {"min": 3, "max": 5}      ],
+    [null, {"min": 1, "max": 4}   ]
     ])";
 
   const std::string date64_expected =
       R"([
-    [{"min": 86400000, "max": 259200000},       1],
-    [{"min": 0, "max": 0},       2],
-    [{"min": null, "max": null}, 3],
-    [{"min": 259200000, "max": 432000000},       4],
-    [{"min": 86400000, "max": 345600000},       null]
+    [1,    {"min": 86400000, "max": 259200000} ],
+    [2,    {"min": 0, "max": 0}                ],
+    [3,    {"min": null, "max": null}          ],
+    [4,    {"min": 259200000, "max": 432000000}],
+    [null, {"min": 86400000, "max": 345600000} ]
     ])";
 
   for (const auto& ty : types) {
@@ -2233,8 +2230,8 @@ TEST_P(GroupBy, MinMaxTypes) {
     AssertDatumsEqual(
         ArrayFromJSON(
             struct_({
-                field("hash_min_max", struct_({field("min", ty), field("max", ty)})),
                 field("key_0", int64()),
+                field("hash_min_max", struct_({field("min", ty), field("max", ty)})),
             }),
             (ty->name() == "date64") ? date64_expected : default_expected),
         aggregated_and_grouped,
@@ -2287,6 +2284,7 @@ TEST_P(GroupBy, MinMaxDecimal) {
 
     AssertDatumsEqual(
         ArrayFromJSON(struct_({
+                          field("key_0", int64()),
                           field("hash_min_max", struct_({
                                                     field("min", decimal128(3, 2)),
                                                     field("max", decimal128(3, 2)),
@@ -2295,14 +2293,13 @@ TEST_P(GroupBy, MinMaxDecimal) {
                                                     field("min", decimal256(3, 2)),
                                                     field("max", decimal256(3, 2)),
                                                 })),
-                          field("key_0", int64()),
                       }),
                       R"([
-    [{"min": "1.01", "max": "3.25"},   {"min": "1.01", "max": "3.25"},   1],
-    [{"min": "-0.25", "max": "0.12"},  {"min": "-0.25", "max": "0.12"},  2],
-    [{"min": null, "max": null},       {"min": null, "max": null},       3],
-    [{"min": "-5.25", "max": "-3.25"}, {"min": "-5.25", "max": "-3.25"}, 4],
-    [{"min": "0.75", "max": "4.01"},   {"min": "0.75", "max": "4.01"},   null]
+    [1,    {"min": "1.01", "max": "3.25"},   {"min": "1.01", "max": "3.25"}  ],
+    [2,    {"min": "-0.25", "max": "0.12"},  {"min": "-0.25", "max": "0.12"} ],
+    [3,    {"min": null, "max": null},       {"min": null, "max": null}      ],
+    [4,    {"min": "-5.25", "max": "-3.25"}, {"min": "-5.25", "max": "-3.25"}],
+    [null, {"min": "0.75", "max": "4.01"},   {"min": "0.75", "max": "4.01"}  ]
   ])"),
         aggregated_and_grouped,
         /*verbose=*/true);
@@ -2345,14 +2342,14 @@ TEST_P(GroupBy, MinMaxBinary) {
       AssertDatumsEqual(
           ArrayFromJSON(
               struct_({
-                  field("hash_min_max", struct_({field("min", ty), field("max", ty)})),
                   field("key_0", int64()),
+                  field("hash_min_max", struct_({field("min", ty), field("max", ty)})),
               }),
               R"([
-    [{"min": "aaaa", "max": "d"},    1],
-    [{"min": "babcd", "max": "bcd"}, 2],
-    [{"min": null, "max": null},     3],
-    [{"min": "123", "max": "2"},     null]
+    [1,    {"min": "aaaa", "max": "d"}   ],
+    [2,    {"min": "babcd", "max": "bcd"}],
+    [3,    {"min": null, "max": null}    ],
+    [null, {"min": "123", "max": "2"}    ]
   ])"),
           aggregated_and_grouped,
           /*verbose=*/true);
@@ -2396,14 +2393,14 @@ TEST_P(GroupBy, MinMaxFixedSizeBinary) {
     AssertDatumsEqual(
         ArrayFromJSON(
             struct_({
-                field("hash_min_max", struct_({field("min", ty), field("max", ty)})),
                 field("key_0", int64()),
+                field("hash_min_max", struct_({field("min", ty), field("max", ty)})),
             }),
             R"([
-    [{"min": "aaa", "max": "ddd"}, 1],
-    [{"min": "bab", "max": "bcd"}, 2],
-    [{"min": null, "max": null},   3],
-    [{"min": "123", "max": "234"}, null]
+    [1,    {"min": "aaa", "max": "ddd"}],
+    [2,    {"min": "bab", "max": "bcd"}],
+    [3,    {"min": null, "max": null}  ],
+    [null, {"min": "123", "max": "234"}]
   ])"),
         aggregated_and_grouped,
         /*verbose=*/true);
@@ -2448,16 +2445,16 @@ TEST_P(GroupBy, MinOrMax) {
   SortBy({"key_0"}, &aggregated_and_grouped);
 
   AssertDatumsEqual(ArrayFromJSON(struct_({
+                                      field("key_0", int64()),
                                       field("hash_min", float64()),
                                       field("hash_max", float64()),
-                                      field("key_0", int64()),
                                   }),
                                   R"([
-    [1.0,   3.25,  1],
-    [-0.25, 0.125, 2],
-    [null,  null,  3],
-    [-Inf,  Inf,   4],
-    [0.75,  4.0,   null]
+    [1,    1.0,   3.25 ],
+    [2,    -0.25, 0.125],
+    [3,    null,  null ],
+    [4,    -Inf,  Inf  ],
+    [null, 0.75,  4.0  ]
   ])"),
                     aggregated_and_grouped,
                     /*verbose=*/true);
@@ -2483,14 +2480,14 @@ TEST_P(GroupBy, MinMaxScalar) {
                    use_threads));
     Datum expected =
         ArrayFromJSON(struct_({
+                          field("key", int64()),
                           field("hash_min_max",
                                 struct_({field("min", int32()), field("max", int32())})),
-                          field("key", int64()),
                       }),
                       R"([
-      [{"min": -1, "max": 2}, 1],
-      [{"min": -1, "max": 3}, 2],
-      [{"min": -1, "max": 4}, 3]
+      [1, {"min": -1, "max": 2}],
+      [2, {"min": -1, "max": 3}],
+      [3, {"min": -1, "max": 4}]
     ])");
     AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
   }
@@ -2562,6 +2559,7 @@ TEST_P(GroupBy, AnyAndAll) {
     // Group 5: trues
     // Group null: falses
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", int64()),
                                         field("hash_any", boolean()),
                                         field("hash_any", boolean()),
                                         field("hash_any", boolean()),
@@ -2570,15 +2568,14 @@ TEST_P(GroupBy, AnyAndAll) {
                                         field("hash_all", boolean()),
                                         field("hash_all", boolean()),
                                         field("hash_all", boolean()),
-                                        field("key_0", int64()),
                                     }),
                                     R"([
-    [true,  null, true,  null, true,  null,  null,  null,  1],
-    [true,  true, true,  true, false, false, false, false, 2],
-    [false, null, null,  null, true,  null,  null,  null,  3],
-    [false, null, null,  null, false, null,  false, null,  4],
-    [true,  null, true,  null, true,  null,  true,  null,  5],
-    [false, null, false, null, false, null,  false, null,  null]
+    [1,    true,  null, true,  null, true,  null,  null,  null ],
+    [2,    true,  true, true,  true, false, false, false, false],
+    [3,    false, null, null,  null, true,  null,  null,  null ],
+    [4,    false, null, null,  null, false, null,  false, null ],
+    [5,    true,  null, true,  null, true,  null,  true,  null ],
+    [null, false, null, false, null, false, null,  false, null ]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -2611,16 +2608,16 @@ TEST_P(GroupBy, AnyAllScalar) {
                                     },
                                     use_threads));
     Datum expected = ArrayFromJSON(struct_({
+                                       field("key", int64()),
                                        field("hash_any", boolean()),
                                        field("hash_all", boolean()),
                                        field("hash_any", boolean()),
                                        field("hash_all", boolean()),
-                                       field("key", int64()),
                                    }),
                                    R"([
-      [true, true,  true, null,  1],
-      [true, false, true, false, 2],
-      [true, true,  true, null,  3]
+      [1, true, true,  true, null ],
+      [2, true, false, true, false],
+      [3, true, true,  true, null ]
     ])");
     AssertDatumsApproxEqual(expected, actual, /*verbose=*/true);
   }
@@ -2686,17 +2683,17 @@ TEST_P(GroupBy, CountDistinct) {
     ValidateOutput(aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", int64()),
                                         field("hash_count_distinct", int64()),
                                         field("hash_count_distinct", int64()),
                                         field("hash_count_distinct", int64()),
-                                        field("key_0", int64()),
                                     }),
                                     R"([
-    [1, 1, 0, 1],
-    [2, 2, 0, 2],
-    [3, 2, 1, 3],
-    [1, 0, 1, 4],
-    [4, 4, 0, null]
+    [1,    1, 1, 0],
+    [2,    2, 2, 0],
+    [3,    3, 2, 1],
+    [4,    1, 0, 1],
+    [null, 4, 4, 0]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -2754,17 +2751,17 @@ TEST_P(GroupBy, CountDistinct) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", int64()),
                                         field("hash_count_distinct", int64()),
                                         field("hash_count_distinct", int64()),
                                         field("hash_count_distinct", int64()),
-                                        field("key_0", int64()),
                                     }),
                                     R"([
-    [1, 1, 0, 1],
-    [2, 2, 0, 2],
-    [3, 2, 1, 3],
-    [1, 0, 1, 4],
-    [4, 4, 0, null]
+    [1,    1, 1, 0],
+    [2,    2, 2, 0],
+    [3,    3, 2, 1],
+    [4,    1, 0, 1],
+    [null, 4, 4, 0]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -2802,14 +2799,14 @@ TEST_P(GroupBy, CountDistinct) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", int64()),
                                         field("hash_count_distinct", int64()),
                                         field("hash_count_distinct", int64()),
                                         field("hash_count_distinct", int64()),
-                                        field("key_0", int64()),
                                     }),
                                     R"([
-    [1, 1, 0, 1],
-    [2, 2, 0, 2]
+    [1, 1, 1, 0],
+    [2, 2, 2, 0]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -2883,7 +2880,7 @@ TEST_P(GroupBy, Distinct) {
 
     auto struct_arr = aggregated_and_grouped.array_as<StructArray>();
 
-    auto all_arr = checked_pointer_cast<ListArray>(struct_arr->field(0));
+    auto all_arr = checked_pointer_cast<ListArray>(struct_arr->field(1));
     AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["foo"])"), sort(*all_arr->value_slice(0)),
                       /*verbose=*/true);
     AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["bar", "spam"])"),
@@ -2895,7 +2892,7 @@ TEST_P(GroupBy, Distinct) {
     AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["a", "b", "baz", "eggs"])"),
                       sort(*all_arr->value_slice(4)), /*verbose=*/true);
 
-    auto valid_arr = checked_pointer_cast<ListArray>(struct_arr->field(1));
+    auto valid_arr = checked_pointer_cast<ListArray>(struct_arr->field(2));
     AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["foo"])"),
                       sort(*valid_arr->value_slice(0)), /*verbose=*/true);
     AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["bar", "spam"])"),
@@ -2907,7 +2904,7 @@ TEST_P(GroupBy, Distinct) {
     AssertDatumsEqual(ArrayFromJSON(utf8(), R"(["a", "b", "baz", "eggs"])"),
                       sort(*valid_arr->value_slice(4)), /*verbose=*/true);
 
-    auto null_arr = checked_pointer_cast<ListArray>(struct_arr->field(2));
+    auto null_arr = checked_pointer_cast<ListArray>(struct_arr->field(3));
     AssertDatumsEqual(ArrayFromJSON(utf8(), R"([])"), sort(*null_arr->value_slice(0)),
                       /*verbose=*/true);
     AssertDatumsEqual(ArrayFromJSON(utf8(), R"([])"), sort(*null_arr->value_slice(1)),
@@ -2950,12 +2947,12 @@ TEST_P(GroupBy, Distinct) {
 
     AssertDatumsEqual(
         ArrayFromJSON(struct_({
+                          field("key_0", int64()),
                           field("hash_distinct", list(utf8())),
                           field("hash_distinct", list(utf8())),
                           field("hash_distinct", list(utf8())),
-                          field("key_0", int64()),
                       }),
-                      R"([[["foo"], ["foo"], [], 1], [["bar"], ["bar"], [], 2]])"),
+                      R"([[1, ["foo"], ["foo"], []], [2, ["bar"], ["bar"], []]])"),
         aggregated_and_grouped,
         /*verbose=*/true);
   }
@@ -3016,12 +3013,11 @@ TEST_P(GroupBy, OneMiscTypes) {
 
     const auto& struct_arr = aggregated_and_grouped.array_as<StructArray>();
     //  Check the key column
-    AssertDatumsEqual(ArrayFromJSON(int64(), R"([1, 2, 3, null])"),
-                      struct_arr->field(struct_arr->num_fields() - 1));
+    AssertDatumsEqual(ArrayFromJSON(int64(), R"([1, 2, 3, null])"), struct_arr->field(0));
 
     //  Check values individually
     auto col_0_type = float64();
-    const auto& col_0 = struct_arr->field(0);
+    const auto& col_0 = struct_arr->field(1);
     EXPECT_THAT(col_0->GetScalar(0), ResultWith(AnyOfJSON(col_0_type, R"([1.0, 3.25])")));
     EXPECT_THAT(col_0->GetScalar(1),
                 ResultWith(AnyOfJSON(col_0_type, R"([0.0, 0.125, -0.25])")));
@@ -3029,14 +3025,14 @@ TEST_P(GroupBy, OneMiscTypes) {
     EXPECT_THAT(col_0->GetScalar(3), ResultWith(AnyOfJSON(col_0_type, R"([4.0, 0.75])")));
 
     auto col_1_type = null();
-    const auto& col_1 = struct_arr->field(1);
+    const auto& col_1 = struct_arr->field(2);
     EXPECT_THAT(col_1->GetScalar(0), ResultWith(AnyOfJSON(col_1_type, R"([null])")));
     EXPECT_THAT(col_1->GetScalar(1), ResultWith(AnyOfJSON(col_1_type, R"([null])")));
     EXPECT_THAT(col_1->GetScalar(2), ResultWith(AnyOfJSON(col_1_type, R"([null])")));
     EXPECT_THAT(col_1->GetScalar(3), ResultWith(AnyOfJSON(col_1_type, R"([null])")));
 
     auto col_2_type = boolean();
-    const auto& col_2 = struct_arr->field(2);
+    const auto& col_2 = struct_arr->field(3);
     EXPECT_THAT(col_2->GetScalar(0), ResultWith(AnyOfJSON(col_2_type, R"([true])")));
     EXPECT_THAT(col_2->GetScalar(1), ResultWith(AnyOfJSON(col_2_type, R"([false])")));
     EXPECT_THAT(col_2->GetScalar(2),
@@ -3045,7 +3041,7 @@ TEST_P(GroupBy, OneMiscTypes) {
                 ResultWith(AnyOfJSON(col_2_type, R"([true, null])")));
 
     auto col_3_type = decimal128(3, 2);
-    const auto& col_3 = struct_arr->field(3);
+    const auto& col_3 = struct_arr->field(4);
     EXPECT_THAT(col_3->GetScalar(0),
                 ResultWith(AnyOfJSON(col_3_type, R"(["1.01", "3.25"])")));
     EXPECT_THAT(col_3->GetScalar(1),
@@ -3055,7 +3051,7 @@ TEST_P(GroupBy, OneMiscTypes) {
                 ResultWith(AnyOfJSON(col_3_type, R"(["4.01", "0.75"])")));
 
     auto col_4_type = decimal256(3, 2);
-    const auto& col_4 = struct_arr->field(4);
+    const auto& col_4 = struct_arr->field(5);
     EXPECT_THAT(col_4->GetScalar(0),
                 ResultWith(AnyOfJSON(col_4_type, R"(["1.01", "3.25"])")));
     EXPECT_THAT(col_4->GetScalar(1),
@@ -3065,7 +3061,7 @@ TEST_P(GroupBy, OneMiscTypes) {
                 ResultWith(AnyOfJSON(col_4_type, R"(["4.01", "0.75"])")));
 
     auto col_5_type = fixed_size_binary(3);
-    const auto& col_5 = struct_arr->field(5);
+    const auto& col_5 = struct_arr->field(6);
     EXPECT_THAT(col_5->GetScalar(0),
                 ResultWith(AnyOfJSON(col_5_type, R"(["aaa", "ddd"])")));
     EXPECT_THAT(col_5->GetScalar(1),
@@ -3137,10 +3133,10 @@ TEST_P(GroupBy, OneNumericTypes) {
       const auto& struct_arr = aggregated_and_grouped.array_as<StructArray>();
       //  Check the key column
       AssertDatumsEqual(ArrayFromJSON(int64(), R"([1, 2, 3, 4, null])"),
-                        struct_arr->field(struct_arr->num_fields() - 1));
+                        struct_arr->field(0));
 
       //  Check values individually
-      const auto& col = struct_arr->field(0);
+      const auto& col = struct_arr->field(1);
       if (type->name() == "date64") {
         EXPECT_THAT(col->GetScalar(0),
                     ResultWith(AnyOfJSON(type, R"([86400000, 259200000])")));
@@ -3197,9 +3193,9 @@ TEST_P(GroupBy, OneBinaryTypes) {
       const auto& struct_arr = aggregated_and_grouped.array_as<StructArray>();
       //  Check the key column
       AssertDatumsEqual(ArrayFromJSON(int64(), R"([1, 2, 3, null])"),
-                        struct_arr->field(struct_arr->num_fields() - 1));
+                        struct_arr->field(0));
 
-      const auto& col = struct_arr->field(0);
+      const auto& col = struct_arr->field(1);
       EXPECT_THAT(col->GetScalar(0), ResultWith(AnyOfJSON(type, R"(["aaaa", "d"])")));
       EXPECT_THAT(col->GetScalar(1),
                   ResultWith(AnyOfJSON(type, R"(["bcd", "bc", "babcd"])")));
@@ -3229,10 +3225,9 @@ TEST_P(GroupBy, OneScalar) {
 
     const auto& struct_arr = actual.array_as<StructArray>();
     //  Check the key column
-    AssertDatumsEqual(ArrayFromJSON(int64(), R"([1, 2, 3])"),
-                      struct_arr->field(struct_arr->num_fields() - 1));
+    AssertDatumsEqual(ArrayFromJSON(int64(), R"([1, 2, 3])"), struct_arr->field(0));
 
-    const auto& col = struct_arr->field(0);
+    const auto& col = struct_arr->field(1);
     EXPECT_THAT(col->GetScalar(0), ResultWith(AnyOfJSON(int32(), R"([-1, 22])")));
     EXPECT_THAT(col->GetScalar(1), ResultWith(AnyOfJSON(int32(), R"([3])")));
     EXPECT_THAT(col->GetScalar(2), ResultWith(AnyOfJSON(int32(), R"([4])")));
@@ -3301,7 +3296,7 @@ TEST_P(GroupBy, ListNumeric) {
 
         auto struct_arr = aggregated_and_grouped.array_as<StructArray>();
 
-        auto list_arr = checked_pointer_cast<ListArray>(struct_arr->field(0));
+        auto list_arr = checked_pointer_cast<ListArray>(struct_arr->field(1));
         AssertDatumsEqual(ArrayFromJSON(type, R"([99, 99])"),
                           sort(*list_arr->value_slice(0)),
                           /*verbose=*/true);
@@ -3373,7 +3368,7 @@ TEST_P(GroupBy, ListNumeric) {
 
         auto struct_arr = aggregated_and_grouped.array_as<StructArray>();
 
-        auto list_arr = checked_pointer_cast<ListArray>(struct_arr->field(0));
+        auto list_arr = checked_pointer_cast<ListArray>(struct_arr->field(1));
         AssertDatumsEqual(ArrayFromJSON(type, R"([99, 99])"),
                           sort(*list_arr->value_slice(0)),
                           /*verbose=*/true);
@@ -3444,9 +3439,9 @@ TEST_P(GroupBy, ListBinaryTypes) {
         const auto& struct_arr = aggregated_and_grouped.array_as<StructArray>();
         // Check the key column
         AssertDatumsEqual(ArrayFromJSON(int64(), R"([1, 2, 3, null])"),
-                          struct_arr->field(struct_arr->num_fields() - 1));
+                          struct_arr->field(0));
 
-        auto list_arr = checked_pointer_cast<ListArray>(struct_arr->field(0));
+        auto list_arr = checked_pointer_cast<ListArray>(struct_arr->field(1));
         AssertDatumsEqual(ArrayFromJSON(type, R"(["aaaa", "d", null])"),
                           sort(*list_arr->value_slice(0)),
                           /*verbose=*/true);
@@ -3507,9 +3502,9 @@ TEST_P(GroupBy, ListBinaryTypes) {
         const auto& struct_arr = aggregated_and_grouped.array_as<StructArray>();
         // Check the key column
         AssertDatumsEqual(ArrayFromJSON(int64(), R"([1, 2, 3, null])"),
-                          struct_arr->field(struct_arr->num_fields() - 1));
+                          struct_arr->field(0));
 
-        auto list_arr = checked_pointer_cast<ListArray>(struct_arr->field(0));
+        auto list_arr = checked_pointer_cast<ListArray>(struct_arr->field(1));
         AssertDatumsEqual(ArrayFromJSON(type, R"(["aaaa", "d", "y"])"),
                           sort(*list_arr->value_slice(0)),
                           /*verbose=*/true);
@@ -3587,12 +3582,11 @@ TEST_P(GroupBy, ListMiscTypes) {
 
     const auto& struct_arr = aggregated_and_grouped.array_as<StructArray>();
     //  Check the key column
-    AssertDatumsEqual(ArrayFromJSON(int64(), R"([1, 2, 3, null])"),
-                      struct_arr->field(struct_arr->num_fields() - 1));
+    AssertDatumsEqual(ArrayFromJSON(int64(), R"([1, 2, 3, null])"), struct_arr->field(0));
 
     //  Check values individually
     auto type_0 = float64();
-    auto list_arr_0 = checked_pointer_cast<ListArray>(struct_arr->field(0));
+    auto list_arr_0 = checked_pointer_cast<ListArray>(struct_arr->field(1));
     AssertDatumsEqual(ArrayFromJSON(type_0, R"([1.0, 3.25, null])"),
                       sort(*list_arr_0->value_slice(0)),
                       /*verbose=*/true);
@@ -3607,7 +3601,7 @@ TEST_P(GroupBy, ListMiscTypes) {
                       /*verbose=*/true);
 
     auto type_1 = null();
-    auto list_arr_1 = checked_pointer_cast<ListArray>(struct_arr->field(1));
+    auto list_arr_1 = checked_pointer_cast<ListArray>(struct_arr->field(2));
     AssertDatumsEqual(ArrayFromJSON(type_1, R"([null, null, null])"),
                       sort(*list_arr_1->value_slice(0)),
                       /*verbose=*/true);
@@ -3622,7 +3616,7 @@ TEST_P(GroupBy, ListMiscTypes) {
                       /*verbose=*/true);
 
     auto type_2 = boolean();
-    auto list_arr_2 = checked_pointer_cast<ListArray>(struct_arr->field(2));
+    auto list_arr_2 = checked_pointer_cast<ListArray>(struct_arr->field(3));
     AssertDatumsEqual(ArrayFromJSON(type_2, R"([true, true, true])"),
                       sort(*list_arr_2->value_slice(0)),
                       /*verbose=*/true);
@@ -3637,7 +3631,7 @@ TEST_P(GroupBy, ListMiscTypes) {
                       /*verbose=*/true);
 
     auto type_3 = decimal128(3, 2);
-    auto list_arr_3 = checked_pointer_cast<ListArray>(struct_arr->field(3));
+    auto list_arr_3 = checked_pointer_cast<ListArray>(struct_arr->field(4));
     AssertDatumsEqual(ArrayFromJSON(type_3, R"(["1.01", "3.25", null])"),
                       sort(*list_arr_3->value_slice(0)),
                       /*verbose=*/true);
@@ -3652,7 +3646,7 @@ TEST_P(GroupBy, ListMiscTypes) {
                       /*verbose=*/true);
 
     auto type_4 = decimal256(3, 2);
-    auto list_arr_4 = checked_pointer_cast<ListArray>(struct_arr->field(4));
+    auto list_arr_4 = checked_pointer_cast<ListArray>(struct_arr->field(5));
     AssertDatumsEqual(ArrayFromJSON(type_4, R"(["1.01", "3.25", null])"),
                       sort(*list_arr_4->value_slice(0)),
                       /*verbose=*/true);
@@ -3667,7 +3661,7 @@ TEST_P(GroupBy, ListMiscTypes) {
                       /*verbose=*/true);
 
     auto type_5 = fixed_size_binary(3);
-    auto list_arr_5 = checked_pointer_cast<ListArray>(struct_arr->field(5));
+    auto list_arr_5 = checked_pointer_cast<ListArray>(struct_arr->field(6));
     AssertDatumsEqual(ArrayFromJSON(type_5, R"(["aaa", "ddd", null])"),
                       sort(*list_arr_5->value_slice(0)),
                       /*verbose=*/true);
@@ -3731,6 +3725,7 @@ TEST_P(GroupBy, CountAndSum) {
 
   AssertDatumsEqual(
       ArrayFromJSON(struct_({
+                        field("key_0", int64()),
                         field("hash_count", int64()),
                         field("hash_count", int64()),
                         field("hash_count", int64()),
@@ -3739,13 +3734,12 @@ TEST_P(GroupBy, CountAndSum) {
                         field("hash_sum", float64()),
                         field("hash_sum", float64()),
                         field("hash_sum", int64()),
-                        field("key_0", int64()),
                     }),
                     R"([
-    [2, 1, 3, 3, 4.25,   null,   3,    1],
-    [3, 0, 3, 3, -0.125, -0.125, 6,    2],
-    [0, 2, 2, 2, null,   null,   6,    3],
-    [2, 0, 2, 2, 4.75,   null,   null, null]
+    [1,    2, 1, 3, 3, 4.25,   null,   3   ],
+    [2,    3, 0, 3, 3, -0.125, -0.125, 6   ],
+    [3,    0, 2, 2, 2, null,   null,   6   ],
+    [null, 2, 0, 2, 2, 4.75,   null,   null]
   ])"),
       aggregated_and_grouped,
       /*verbose=*/true);
@@ -3780,14 +3774,14 @@ TEST_P(GroupBy, StandAloneNullaryCount) {
                            }));
 
   AssertDatumsEqual(ArrayFromJSON(struct_({
-                                      field("hash_count_all", int64()),
                                       field("key_0", int64()),
+                                      field("hash_count_all", int64()),
                                   }),
                                   R"([
-    [3, 1],
-    [3, 2],
-    [2, 3],
-    [2, null]
+    [1, 3   ],
+    [2, 3   ],
+    [3, 2   ],
+    [null, 2]
   ])"),
                     aggregated_and_grouped,
                     /*verbose=*/true);
@@ -3828,16 +3822,16 @@ TEST_P(GroupBy, Product) {
                            }));
 
   AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                            field("key_0", int64()),
                                             field("hash_product", float64()),
                                             field("hash_product", int64()),
                                             field("hash_product", float64()),
-                                            field("key_0", int64()),
                                         }),
                                         R"([
-    [-3.25, 1,    null, 1],
-    [-0.0,  8,    -0.0, 2],
-    [null,  9,    null, 3],
-    [3.0,   null, null, null]
+    [1,    -3.25, 1,    null],
+    [2,    -0.0,  8,    -0.0],
+    [3,    null,  9,    null],
+    [null, 3.0,   null, null]
   ])"),
                           aggregated_and_grouped,
                           /*verbose=*/true);
@@ -3863,10 +3857,10 @@ TEST_P(GroupBy, Product) {
                            }));
 
   AssertDatumsApproxEqual(ArrayFromJSON(struct_({
-                                            field("hash_product", int64()),
                                             field("key_0", int64()),
+                                            field("hash_product", int64()),
                                         }),
-                                        R"([[8589934592, 1]])"),
+                                        R"([[1, 8589934592]])"),
                           aggregated_and_grouped,
                           /*verbose=*/true);
 }
@@ -3913,19 +3907,19 @@ TEST_P(GroupBy, SumMeanProductKeepNulls) {
                            }));
 
   AssertDatumsApproxEqual(ArrayFromJSON(struct_({
+                                            field("key_0", int64()),
                                             field("hash_sum", float64()),
                                             field("hash_sum", float64()),
                                             field("hash_mean", float64()),
                                             field("hash_mean", float64()),
                                             field("hash_product", float64()),
                                             field("hash_product", float64()),
-                                            field("key_0", int64()),
                                         }),
                                         R"([
-    [null,   null,   null,       null,       null, null, 1],
-    [-0.125, -0.125, -0.0416667, -0.0416667, -0.0, -0.0, 2],
-    [null,   null,   null,       null,       null, null, 3],
-    [4.75,   null,   2.375,      null,       3.0,  null, null]
+    [1,    null,   null,   null,       null,       null, null],
+    [2,    -0.125, -0.125, -0.0416667, -0.0416667, -0.0, -0.0],
+    [3,    null,   null,   null,       null,       null, null],
+    [null, 4.75,   null,   2.375,      null,       3.0,  null]
   ])"),
                           aggregated_and_grouped,
                           /*verbose=*/true);
@@ -3958,14 +3952,14 @@ TEST_P(GroupBy, SumOnlyStringAndDictKeys) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
-                                        field("hash_sum", float64()),
                                         field("key_0", key_type),
+                                        field("hash_sum", float64()),
                                     }),
                                     R"([
-    [4.25,   "alfa"],
-    [-0.125, "beta"],
-    [null,   "gama"],
-    [4.75,    null ]
+    ["alfa", 4.25  ],
+    ["beta", -0.125],
+    ["gama", null  ],
+    [null,   4.75  ]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -4088,19 +4082,19 @@ TEST_P(GroupBy, WithChunkedArray) {
                            }));
 
   AssertDatumsEqual(ArrayFromJSON(struct_({
+                                      field("key_0", int64()),
                                       field("hash_count", int64()),
                                       field("hash_sum", float64()),
                                       field("hash_min_max", struct_({
                                                                 field("min", float64()),
                                                                 field("max", float64()),
                                                             })),
-                                      field("key_0", int64()),
                                   }),
                                   R"([
-    [2, 4.25,   {"min": 1.0,   "max": 3.25},  1],
-    [3, -0.125, {"min": -0.25, "max": 0.125}, 2],
-    [0, null,   {"min": null,  "max": null},  3],
-    [2, 4.75,   {"min": 0.75,  "max": 4.0},   null]
+    [1,    2, 4.25,   {"min": 1.0,   "max": 3.25} ],
+    [2,    3, -0.125, {"min": -0.25, "max": 0.125}],
+    [3,    0, null,   {"min": null,  "max": null} ],
+    [null, 2, 4.75,   {"min": 0.75,  "max": 4.0}  ]
   ])"),
                     aggregated_and_grouped,
                     /*verbose=*/true);
@@ -4125,15 +4119,15 @@ TEST_P(GroupBy, MinMaxWithNewGroupsInChunkedArray) {
                            }));
 
   AssertDatumsEqual(ArrayFromJSON(struct_({
+                                      field("key_0", int64()),
                                       field("hash_min_max", struct_({
                                                                 field("min", int64()),
                                                                 field("max", int64()),
                                                             })),
-                                      field("key_0", int64()),
                                   }),
                                   R"([
-    [{"min": 1, "max": 1}, 0],
-    [{"min": 0, "max": 0}, 1]
+    [0, {"min": 1, "max": 1}],
+    [1, {"min": 0, "max": 0}]
   ])"),
                     aggregated_and_grouped,
                     /*verbose=*/true);
@@ -4161,14 +4155,14 @@ TEST_P(GroupBy, SmallChunkSizeSumOnly) {
                                   },
                                   small_chunksize_context()));
   AssertDatumsEqual(ArrayFromJSON(struct_({
-                                      field("hash_sum", float64()),
                                       field("key_0", int64()),
+                                      field("hash_sum", float64()),
                                   }),
                                   R"([
-    [4.25,   1],
-    [-0.125, 2],
-    [null,   3],
-    [4.75,   null]
+    [1,    4.25  ],
+    [2,    -0.125],
+    [3,    null  ],
+    [null, 4.75  ]
   ])"),
                     aggregated_and_grouped,
                     /*verbose=*/true);
@@ -4216,16 +4210,16 @@ TEST_P(GroupBy, CountWithNullType) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", int64()),
                                         field("hash_count", int64()),
                                         field("hash_count", int64()),
                                         field("hash_count", int64()),
-                                        field("key_0", int64()),
                                     }),
                                     R"([
-    [3, 0, 3, 1],
-    [3, 0, 3, 2],
-    [2, 0, 2, 3],
-    [2, 0, 2, null]
+    [1,    3, 0, 3],
+    [2,    3, 0, 3],
+    [3,    2, 0, 2],
+    [null, 2, 0, 2]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -4303,6 +4297,7 @@ TEST_P(GroupBy, SingleNullTypeKey) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", null()),
                                         field("hash_count", int64()),
                                         field("hash_sum", int64()),
                                         field("hash_mean", float64()),
@@ -4310,10 +4305,9 @@ TEST_P(GroupBy, SingleNullTypeKey) {
                                                                   field("min", int64()),
                                                                   field("max", int64()),
                                                               })),
-                                        field("key_0", null()),
                                     }),
                                     R"([
-    [8, 15, 1.875, {"min": 1, "max": 3}, null]
+    [null, 8, 15, 1.875, {"min": 1, "max": 3}]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -4360,20 +4354,20 @@ TEST_P(GroupBy, MultipleKeysIncludesNullType) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", utf8()),
+                                        field("key_1", null()),
                                         field("hash_count", int64()),
                                         field("hash_sum", float64()),
                                         field("hash_min_max", struct_({
                                                                   field("min", float64()),
                                                                   field("max", float64()),
                                                               })),
-                                        field("key_0", utf8()),
-                                        field("key_1", null()),
                                     }),
                                     R"([
-    [2, 4.25,   {"min": 1,     "max": 3.25},  "a",      null],
-    [0, null,   {"min": null,  "max": null},  "aa",     null],
-    [3, -0.125, {"min": -0.25, "max": 0.125}, "bcdefg", null],
-    [2, 4.75,   {"min": 0.75,  "max": 4},     null,     null]
+    ["a",      null, 2, 4.25,   {"min": 1,     "max": 3.25} ],
+    ["aa",     null, 0, null,   {"min": null,  "max": null} ],
+    ["bcdefg", null, 3, -0.125, {"min": -0.25, "max": 0.125}],
+    [null,     null, 2, 4.75,   {"min": 0.75,  "max": 4}    ]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -4429,17 +4423,17 @@ TEST_P(GroupBy, SumNullType) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", int64()),
                                         field("hash_sum", int64()),
                                         field("hash_sum", int64()),
                                         field("hash_sum", int64()),
                                         field("hash_sum", int64()),
-                                        field("key_0", int64()),
                                     }),
                                     R"([
-    [0, null, null, null, 1],
-    [0, null, null, null, 2],
-    [0, null, null, null, 3],
-    [0, null, null, null, null]
+    [1,    0, null, null, null],
+    [2,    0, null, null, null],
+    [3,    0, null, null, null],
+    [null, 0, null, null, null]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -4495,17 +4489,17 @@ TEST_P(GroupBy, ProductNullType) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", int64()),
                                         field("hash_product", int64()),
                                         field("hash_product", int64()),
                                         field("hash_product", int64()),
                                         field("hash_product", int64()),
-                                        field("key_0", int64()),
                                     }),
                                     R"([
-    [1, null, null, null, 1],
-    [1, null, null, null, 2],
-    [1, null, null, null, 3],
-    [1, null, null, null, null]
+    [1,    1, null, null, null],
+    [2,    1, null, null, null],
+    [3,    1, null, null, null],
+    [null, 1, null, null, null]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -4561,17 +4555,17 @@ TEST_P(GroupBy, MeanNullType) {
     SortBy({"key_0"}, &aggregated_and_grouped);
 
     AssertDatumsEqual(ArrayFromJSON(struct_({
+                                        field("key_0", int64()),
                                         field("hash_mean", float64()),
                                         field("hash_mean", float64()),
                                         field("hash_mean", float64()),
                                         field("hash_mean", float64()),
-                                        field("key_0", int64()),
                                     }),
                                     R"([
-    [0, null, null, null, 1],
-    [0, null, null, null, 2],
-    [0, null, null, null, 3],
-    [0, null, null, null, null]
+    [1,    0, null, null, null],
+    [2,    0, null, null, null],
+    [3,    0, null, null, null],
+    [null, 0, null, null, null]
   ])"),
                       aggregated_and_grouped,
                       /*verbose=*/true);
@@ -4608,11 +4602,11 @@ TEST_P(GroupBy, NullTypeEmptyTable) {
                              },
                              use_threads));
     auto struct_arr = aggregated_and_grouped.array_as<StructArray>();
-    AssertDatumsEqual(ArrayFromJSON(int64(), "[]"), struct_arr->field(0),
-                      /*verbose=*/true);
     AssertDatumsEqual(ArrayFromJSON(int64(), "[]"), struct_arr->field(1),
                       /*verbose=*/true);
-    AssertDatumsEqual(ArrayFromJSON(float64(), "[]"), struct_arr->field(2),
+    AssertDatumsEqual(ArrayFromJSON(int64(), "[]"), struct_arr->field(2),
+                      /*verbose=*/true);
+    AssertDatumsEqual(ArrayFromJSON(float64(), "[]"), struct_arr->field(3),
                       /*verbose=*/true);
   }
 }
@@ -4745,44 +4739,44 @@ Result<std::shared_ptr<Table>> GetSingleSegmentInputAsCombined() {
 
 Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentScalarOutput() {
   return ChunkedArrayFromJSON(struct_({
+                                  field("key_0", int64()),
                                   field("count", int64()),
                                   field("sum", float64()),
                                   field("min_max", struct_({
                                                        field("min", float64()),
                                                        field("max", float64()),
                                                    })),
-                                  field("key_0", int64()),
                               }),
                               {R"([
-    [7, 8.875, {"min": -0.25, "max": 4.0}, 1]
+    [1, 7, 8.875, {"min": -0.25, "max": 4.0}]
   ])",
                                R"([
-    [7, 8.875, {"min": -0.25, "max": 4.0}, 0]
+    [0, 7, 8.875, {"min": -0.25, "max": 4.0}]
   ])"});
 }
 
 Result<std::shared_ptr<ChunkedArray>> GetSingleSegmentKeyOutput() {
   return ChunkedArrayFromJSON(struct_({
+                                  field("key_0", int64()),
+                                  field("key_1", int64()),
                                   field("hash_count", int64()),
                                   field("hash_sum", float64()),
                                   field("hash_min_max", struct_({
                                                             field("min", float64()),
                                                             field("max", float64()),
                                                         })),
-                                  field("key_0", int64()),
-                                  field("key_1", int64()),
                               }),
                               {R"([
-    [2, 4.25,   {"min": 1.0,   "max": 3.25},  1, 1],
-    [3, -0.125, {"min": -0.25, "max": 0.125}, 2, 1],
-    [0, null,   {"min": null,  "max": null},  3, 1],
-    [2, 4.75,   {"min": 0.75,  "max": 4.0},   null, 1]
+    [1,    1, 2, 4.25,   {"min": 1.0,   "max": 3.25} ],
+    [2,    1, 3, -0.125, {"min": -0.25, "max": 0.125}],
+    [3,    1, 0, null,   {"min": null,  "max": null} ],
+    [null, 1, 2, 4.75,   {"min": 0.75,  "max": 4.0}  ]
   ])",
                                R"([
-    [2, 4.25,   {"min": 1.0,   "max": 3.25},  1, 0],
-    [3, -0.125, {"min": -0.25, "max": 0.125}, 2, 0],
-    [0, null,   {"min": null,  "max": null},  3, 0],
-    [2, 4.75,   {"min": 0.75,  "max": 4.0},   null, 0]
+    [1,    0, 2, 4.25,   {"min": 1.0,   "max": 3.25} ],
+    [2,    0, 3, -0.125, {"min": -0.25, "max": 0.125}],
+    [3,    0, 0, null,   {"min": null,  "max": null} ],
+    [null, 0, 2, 4.75,   {"min": 0.75,  "max": 4.0}  ]
   ])"});
 }
 
@@ -4839,7 +4833,7 @@ Result<std::shared_ptr<Table>> GetEmptySegmentKeysInputAsCombined() {
 Result<std::shared_ptr<Array>> GetEmptySegmentKeyOutput() {
   ARROW_ASSIGN_OR_RAISE(auto chunked, GetSingleSegmentKeyOutput());
   ARROW_ASSIGN_OR_RAISE(auto table, Table::FromChunkedStructArray(chunked));
-  ARROW_ASSIGN_OR_RAISE(auto removed, table->RemoveColumn(table->num_columns() - 1));
+  ARROW_ASSIGN_OR_RAISE(auto removed, table->RemoveColumn(1));
   auto sliced = removed->Slice(0, 4);
   ARROW_ASSIGN_OR_RAISE(auto batch, sliced->CombineChunksToBatch());
   return batch->ToStructArray();
@@ -4885,10 +4879,12 @@ Result<std::shared_ptr<ChunkedArray>> GetMultiSegmentKeyOutput(
     const std::string& add_name) {
   ARROW_ASSIGN_OR_RAISE(auto chunked, GetSingleSegmentKeyOutput());
   ARROW_ASSIGN_OR_RAISE(auto table, Table::FromChunkedStructArray(chunked));
-  int last = table->num_columns() - 1;
-  auto add_field = field(add_name, table->schema()->field(last)->type());
+  int existing_key_field_idx = 1;
+  auto add_field =
+      field(add_name, table->schema()->field(existing_key_field_idx)->type());
   ARROW_ASSIGN_OR_RAISE(auto added,
-                        table->AddColumn(last + 1, add_field, table->column(last)));
+                        table->AddColumn(existing_key_field_idx + 1, add_field,
+                                         table->column(existing_key_field_idx)));
   ARROW_ASSIGN_OR_RAISE(auto batch, added->CombineChunksToBatch());
   ARROW_ASSIGN_OR_RAISE(auto array, batch->ToStructArray());
   return ChunkedArray::Make({array->Slice(0, 4), array->Slice(4, 4)}, array->type());
diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc
index a3ba1946a1..3ce2ba2b8c 100644
--- a/cpp/src/arrow/acero/plan_test.cc
+++ b/cpp/src/arrow/acero/plan_test.cc
@@ -1113,11 +1113,11 @@ BatchesWithSchema MakeGroupableBatches(int multiplicity = 1) {
 
 TEST(ExecPlanExecution, SourceGroupedSum) {
   std::shared_ptr<Schema> out_schema =
-      schema({field("sum(i32)", int64()), field("str", utf8())});
+      schema({field("str", utf8()), field("sum(i32)", int64())});
   const std::shared_ptr<Table> expected_parallel =
-      TableFromJSON(out_schema, {R"([[800, "alfa"], [1000, "beta"], [400, "gama"]])"});
+      TableFromJSON(out_schema, {R"([["alfa", 800], ["beta", 1000], ["gama", 400]])"});
   const std::shared_ptr<Table> expected_single =
-      TableFromJSON(out_schema, {R"([[8, "alfa"], [10, "beta"], [4, "gama"]])"});
+      TableFromJSON(out_schema, {R"([["alfa", 8], ["beta", 10], ["gama", 4]])"});
 
   for (bool parallel : {false, true}) {
     SCOPED_TRACE(parallel ? "parallel/merged" : "serial");
@@ -1193,10 +1193,10 @@ TEST(ExecPlanExecution, NestedSourceProjectGroupedSum) {
 
     auto input = MakeNestedBatches();
     auto expected =
-        TableFromJSON(schema({field("x", int64()), field("y", boolean())}), {R"([
-      [null, true],
-      [17, false],
-      [5, null]
+        TableFromJSON(schema({field("bool", boolean()), field("i32", int64())}), {R"([
+      [true, null],
+      [false, 17],
+      [null, 5]
 ])"});
 
     Declaration plan = Declaration::Sequence(
@@ -1236,9 +1236,10 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumFilter) {
          {"filter", FilterNodeOptions{greater(field_ref("sum(multiply(i32, 2))"),
                                               literal(10 * batch_multiplicity))}}});
 
-    auto expected = TableFromJSON(schema({field("a", int64()), field("b", utf8())}),
-                                  {parallel ? R"([[3600, "alfa"], [2000, "beta"]])"
-                                            : R"([[36, "alfa"], [20, "beta"]])"});
+    auto expected = TableFromJSON(
+        schema({field("str", utf8()), field("sum(multiply(i32, 2))", int64())}),
+        {parallel ? R"([["alfa", 3600], ["beta", 2000]])"
+                  : R"([["alfa", 36], ["beta", 20]])"});
     ASSERT_OK_AND_ASSIGN(auto actual, DeclarationToTable(std::move(plan), parallel));
     AssertTablesEqualIgnoringOrder(expected, actual);
   }
@@ -1279,8 +1280,8 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumOrderBy) {
 
     ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
                 Finishes(ResultWith(ElementsAreArray({ExecBatchFromJSON(
-                    {int64(), utf8()}, parallel ? R"([[2000, "beta"], [3600, "alfa"]])"
-                                                : R"([[20, "beta"], [36, "alfa"]])")}))));
+                    {utf8(), int64()}, parallel ? R"([["beta", 2000], ["alfa", 3600]])"
+                                                : R"([["beta", 20], ["alfa", 36]])")}))));
   }
 }
 
@@ -1315,7 +1316,7 @@ TEST(ExecPlanExecution, SourceFilterProjectGroupedSumTopK) {
     ASSERT_THAT(
         StartAndCollect(plan.get(), sink_gen),
         Finishes(ResultWith(ElementsAreArray({ExecBatchFromJSON(
-            {int64(), utf8()}, parallel ? R"([[800, "gama"]])" : R"([[8, "gama"]])")}))));
+            {utf8(), int64()}, parallel ? R"([["gama", 800]])" : R"([["gama", 8]])")}))));
   }
 }
 
@@ -1374,8 +1375,8 @@ TEST(ExecPlanExecution, AggregationPreservesOptions) {
     }
 
     std::shared_ptr<Table> expected =
-        TableFromJSON(schema({field("count(i32)", int64()), field("str", utf8())}),
-                      {R"([[500, "alfa"], [200, "beta"], [200, "gama"]])"});
+        TableFromJSON(schema({field("str", utf8()), field("count(i32)", int64())}),
+                      {R"([["alfa", 500], ["beta", 200], ["gama", 200]])"});
 
     ASSERT_FINISHES_OK_AND_ASSIGN(std::shared_ptr<Table> actual, table_future);
     AssertTablesEqualIgnoringOrder(expected, actual);
@@ -1479,7 +1480,7 @@ TEST(ExecPlanExecution, ScalarSourceGroupedSum) {
 
   ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
               Finishes(ResultWith(UnorderedElementsAreArray({
-                  ExecBatchFromJSON({int64(), boolean()}, R"([[6, true], [18, false]])"),
+                  ExecBatchFromJSON({boolean(), int64()}, R"([[true, 6], [false, 18]])"),
               }))));
 }
 
@@ -1638,8 +1639,8 @@ TEST(ExecPlanExecution, SegmentedAggregationWithOneSegment) {
   ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
                        DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
 
-  auto expected = ExecBatchFromJSON({int64(), float64(), int32(), int32()},
-                                    R"([[6, 2, 1, 1], [6, 2, 2, 1]])");
+  auto expected = ExecBatchFromJSON({int32(), int32(), int64(), float64()},
+                                    R"([[1, 1, 6, 2], [2, 1, 6, 2]])");
   AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
                                       {expected});
 }
@@ -1663,13 +1664,13 @@ TEST(ExecPlanExecution, SegmentedAggregationWithTwoSegments) {
                                               {"hash_sum", nullptr, "c", "sum(c)"},
                                               {"hash_mean", nullptr, "c", "mean(c)"},
                                           },
-                                          /*keys=*/{"b"}, /*segment_leys=*/{"a"}}}});
+                                          /*keys=*/{"b"}, /*segment_keys=*/{"a"}}}});
   ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
                        DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
 
   auto expected = ExecBatchFromJSON(
-      {int64(), float64(), int32(), int32()},
-      R"([[3, 1.5, 1, 1], [1, 1, 2, 1], [3, 3, 1, 2], [5, 2.5, 2, 2]])");
+      {int32(), int32(), int64(), float64()},
+      R"([[1, 1, 3, 1.5], [2, 1, 1, 1], [1, 2, 3, 3], [2, 2, 5, 2.5]])");
   AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
                                       {expected});
 }
@@ -1697,8 +1698,8 @@ TEST(ExecPlanExecution, SegmentedAggregationWithBatchCrossingSegment) {
   ASSERT_OK_AND_ASSIGN(BatchesWithCommonSchema actual_batches,
                        DeclarationToExecBatches(std::move(plan), /*use_threads=*/false));
 
-  auto expected = ExecBatchFromJSON({int64(), float64(), int32(), int32()},
-                                    R"([[2, 1, 1, 1], [4, 2, 2, 2], [6, 3, 3, 3]])");
+  auto expected = ExecBatchFromJSON({int32(), int32(), int64(), float64()},
+                                    R"([[1, 1, 2, 1], [2, 2, 4, 2], [3, 3, 6, 3]])");
   AssertExecBatchesEqualIgnoringOrder(actual_batches.schema, actual_batches.batches,
                                       {expected});
 }
diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc
index 159d1ac033..cde3a725c4 100644
--- a/cpp/src/arrow/dataset/scanner_test.cc
+++ b/cpp/src/arrow/dataset/scanner_test.cc
@@ -2812,7 +2812,7 @@ TEST(ScanNode, MinimalGroupedAggEndToEnd) {
 
   // translate sink_gen (async) to sink_reader (sync)
   std::shared_ptr<RecordBatchReader> sink_reader = acero::MakeGeneratorReader(
-      schema({field("sum(a * 2)", int64()), field("b", boolean())}), std::move(sink_gen),
+      schema({field("b", boolean()), field("sum(a * 2)", int64())}), std::move(sink_gen),
       exec_context.memory_pool());
 
   // start the ExecPlan
@@ -2832,11 +2832,11 @@ TEST(ScanNode, MinimalGroupedAggEndToEnd) {
   ASSERT_TRUE(plan->finished().Wait(/*seconds=*/1)) << "ExecPlan didn't finish within 1s";
 
   auto expected = TableFromJSON(
-      schema({field("sum(a * 2)", int64()), field("b", boolean())}), {
+      schema({field("b", boolean()), field("sum(a * 2)", int64())}), {
                                                                          R"JSON([
-                                               {"sum(a * 2)": 4,  "b": true},
-                                               {"sum(a * 2)": 12, "b": null},
-                                               {"sum(a * 2)": 40, "b": false}
+                                               {"b": true, "sum(a * 2)": 4},
+                                               {"b": null, "sum(a * 2)": 12},
+                                               {"b": false, "sum(a * 2)": 40}
                                           ])JSON"});
   AssertTablesEqual(*expected, *sorted.table(), /*same_chunk_layout=*/false);
 }
diff --git a/cpp/src/arrow/engine/substrait/function_test.cc b/cpp/src/arrow/engine/substrait/function_test.cc
index bb9df20846..9164bf0a4b 100644
--- a/cpp/src/arrow/engine/substrait/function_test.cc
+++ b/cpp/src/arrow/engine/substrait/function_test.cc
@@ -661,14 +661,12 @@ void CheckGroupedAggregateCase(const AggregateTestCase& test_case) {
   ASSERT_OK_AND_ASSIGN(
       std::shared_ptr<Array> sort_indices,
       compute::SortIndices(output_table, compute::SortOptions({compute::SortKey(
-                                             output_table->num_columns() - 1,
-                                             compute::SortOrder::Ascending)})));
+                                             0, compute::SortOrder::Ascending)})));
   ASSERT_OK_AND_ASSIGN(Datum sorted_table_datum,
                        compute::Take(output_table, sort_indices));
   output_table = sorted_table_datum.table();
-  // TODO(ARROW-17245) We should be selecting N-1 here but Acero
-  // currently emits things in reverse order
-  ASSERT_OK_AND_ASSIGN(output_table, output_table->SelectColumns({0}));
+  ASSERT_OK_AND_ASSIGN(output_table,
+                       output_table->SelectColumns({output_table->num_columns() - 1}));
 
   std::shared_ptr<Table> expected_output =
       GetOutputTableForAggregateCase(test_case.output_type, test_case.group_outputs);
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc
index 02be82972d..ca96b2bf7c 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -3449,12 +3449,12 @@ TEST(SubstraitRoundTrip, AggregateRel) {
   ASSERT_OK_AND_ASSIGN(auto buf,
                        internal::SubstraitFromJSON("Plan", substrait_json,
                                                    /*ignore_unknown_fields=*/false));
-  auto output_schema = schema({field("aggregates", int64()), field("keys", int32())});
+  auto output_schema = schema({field("keys", int32()), field("aggregates", int64())});
   auto expected_table = TableFromJSON(output_schema, {R"([
-      [80, 10],
-      [90, 20],
-      [60, 30],
-      [60, 40]
+      [10, 80],
+      [20, 90],
+      [30, 60],
+      [40, 60]
   ])"});
 
   NamedTableProvider table_provider = AlwaysProvideSameTable(std::move(input_table));
@@ -3489,7 +3489,7 @@ TEST(SubstraitRoundTrip, AggregateRelEmit) {
         "aggregate": {
           "common": {
           "emit": {
-            "outputMapping": [0]
+            "outputMapping": [1]
           }
         },
           "input": {
@@ -5701,7 +5701,7 @@ TEST(Substrait, PlanWithSegmentedAggregateExtension) {
             }
           }
         },
-        "names": ["v", "k", "t"]
+        "names": ["k", "t", "v"]
       }
     }],
     "expectedTypeUrls": []
@@ -5724,9 +5724,9 @@ TEST(Substrait, PlanWithSegmentedAggregateExtension) {
   ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", substrait_json));
 
   std::shared_ptr<Schema> output_schema =
-      schema({field("v", float64()), field("k", int32()), field("t", int32())});
+      schema({field("k", int32()), field("t", int32()), field("v", float64())});
   auto expected_table =
-      TableFromJSON(output_schema, {"[[4, 1, 1], [2, 2, 1], [10, 2, 2], [5, 1, 2]]"});
+      TableFromJSON(output_schema, {"[[1, 1, 4], [2, 1, 2], [2, 2, 10], [1, 2, 5]]"});
   CheckRoundTripResult(std::move(expected_table), buf, {}, conversion_options);
 }
 
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 82b71eeb9a..9e1b00bee8 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -4879,11 +4879,11 @@ cdef class Table(_PandasConvertible):
         >>> table = pa.Table.from_pandas(df)
         >>> table.group_by('year').aggregate([('n_legs', 'sum')])
         pyarrow.Table
-        n_legs_sum: int64
         year: int64
+        n_legs_sum: int64
         ----
-        n_legs_sum: [[2,6,104,5]]
         year: [[2020,2022,2021,2019]]
+        n_legs_sum: [[2,6,104,5]]
         """
         return TableGroupBy(self, keys)
 
@@ -5486,11 +5486,11 @@ class TableGroupBy:
 
     >>> pa.TableGroupBy(t,"keys").aggregate([("values", "sum")])
     pyarrow.Table
-    values_sum: int64
     keys: string
+    values_sum: int64
     ----
-    values_sum: [[3,7,5]]
     keys: [["a","b","c"]]
+    values_sum: [[3,7,5]]
     """
 
     def __init__(self, table, keys):
@@ -5536,21 +5536,21 @@ list[tuple(str, str, FunctionOptions)]
 
         >>> t.group_by("keys").aggregate([("values", "sum")])
         pyarrow.Table
-        values_sum: int64
         keys: string
+        values_sum: int64
         ----
-        values_sum: [[3,7,5]]
         keys: [["a","b","c"]]
+        values_sum: [[3,7,5]]
 
         Count the rows over the grouped column "keys":
 
         >>> t.group_by("keys").aggregate([([], "count_all")])
         pyarrow.Table
-        count_all: int64
         keys: string
+        count_all: int64
         ----
-        count_all: [[2,2,1]]
         keys: [["a","b","c"]]
+        count_all: [[2,2,1]]
 
         Do multiple aggregations:
 
@@ -5559,13 +5559,13 @@ list[tuple(str, str, FunctionOptions)]
         ...    ("keys", "count")
         ... ])
         pyarrow.Table
+        keys: string
         values_sum: int64
         keys_count: int64
-        keys: string
         ----
+        keys: [["a","b","c"]]
         values_sum: [[3,7,5]]
         keys_count: [[2,2,1]]
-        keys: [["a","b","c"]]
 
         Count the number of non-null values for column "values"
         over the grouped column "keys":
@@ -5575,11 +5575,11 @@ list[tuple(str, str, FunctionOptions)]
         ...    ("values", "count", pc.CountOptions(mode="only_valid"))
         ... ])
         pyarrow.Table
-        values_count: int64
         keys: string
+        values_count: int64
         ----
-        values_count: [[2,2,1]]
         keys: [["a","b","c"]]
+        values_count: [[2,2,1]]
 
         Get a single row for each group in column "keys":
 
diff --git a/python/pyarrow/tests/test_acero.py b/python/pyarrow/tests/test_acero.py
index f32ca25a6c..7db4afd000 100644
--- a/python/pyarrow/tests/test_acero.py
+++ b/python/pyarrow/tests/test_acero.py
@@ -204,7 +204,7 @@ def test_aggregate_hash():
         table_source, Declaration("aggregate", aggr_opts)
     ])
     result = decl.to_table()
-    expected = pa.table({"count(a)": [1, 1], "b": ["foo", "bar"]})
+    expected = pa.table({"b": ["foo", "bar"], "count(a)": [1, 1]})
     assert result.equals(expected)
 
     # specify function options
@@ -215,7 +215,7 @@ def test_aggregate_hash():
         table_source, Declaration("aggregate", aggr_opts)
     ])
     result = decl.to_table()
-    expected_all = pa.table({"count(a)": [2, 1], "b": ["foo", "bar"]})
+    expected_all = pa.table({"b": ["foo", "bar"], "count(a)": [2, 1]})
     assert result.equals(expected_all)
 
     # specify keys as field references
diff --git a/r/R/dplyr-collect.R b/r/R/dplyr-collect.R
index f45a9886ea..9205a31b14 100644
--- a/r/R/dplyr-collect.R
+++ b/r/R/dplyr-collect.R
@@ -181,9 +181,7 @@ implicit_schema <- function(.data) {
   } else {
     hash <- length(.data$group_by_vars) > 0
     # The output schema is based on the aggregations and any group_by vars.
-    # The group_by vars come first (this can't be done by summarize; they have
-    # to be last per the aggregate node signature, and they get projected to
-    # this order after aggregation)
+    # The group_by vars come first.
     new_fields <- c(
       group_types(.data, old_schm),
       aggregate_types(.data, hash, old_schm)
diff --git a/r/R/query-engine.R b/r/R/query-engine.R
index ea5a3f1c57..4b9b7ac459 100644
--- a/r/R/query-engine.R
+++ b/r/R/query-engine.R
@@ -127,20 +127,13 @@ ExecPlan <- R6Class("ExecPlan",
           key_names = group_vars
         )
 
-        if (grouped) {
-          # The result will have result columns first then the grouping cols.
-          # dplyr orders group cols first, so adapt the result to meet that expectation.
-          node <- node$Project(
-            make_field_refs(c(group_vars, names(.data$aggregations)))
+        if (grouped && getOption("arrow.summarise.sort", FALSE)) {
+          # Add sorting instructions for the rows too to match dplyr
+          # (see below about why sorting isn't itself a Node)
+          node$extras$sort <- list(
+            names = group_vars,
+            orders = rep(0L, length(group_vars))
           )
-          if (getOption("arrow.summarise.sort", FALSE)) {
-            # Add sorting instructions for the rows too to match dplyr
-            # (see below about why sorting isn't itself a Node)
-            node$extras$sort <- list(
-              names = group_vars,
-              orders = rep(0L, length(group_vars))
-            )
-          }
         }
       } else {
         # If any columns are derived, reordered, or renamed we need to Project
diff --git a/r/tests/testthat/test-dataset-dplyr.R b/r/tests/testthat/test-dataset-dplyr.R
index c8054b0c83..e20a6262b7 100644
--- a/r/tests/testthat/test-dataset-dplyr.R
+++ b/r/tests/testthat/test-dataset-dplyr.R
@@ -381,7 +381,6 @@ test_that("show_exec_plan(), show_query() and explain() with datasets", {
       show_exec_plan(),
     regexp = paste0(
       "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
-      "ProjectNode.*", # output columns
       "GroupByNode.*", # group by node
       "keys=.*part.*", # key for aggregations
       "aggregates=.*hash_mean.*", # aggregations
diff --git a/r/tests/testthat/test-dplyr-query.R b/r/tests/testthat/test-dplyr-query.R
index 00a9784e80..0b2b23ec86 100644
--- a/r/tests/testthat/test-dplyr-query.R
+++ b/r/tests/testthat/test-dplyr-query.R
@@ -508,7 +508,6 @@ test_that("show_exec_plan(), show_query() and explain()", {
       show_exec_plan(),
     regexp = paste0(
       "ExecPlan with .* nodes:.*", # boiler plate for ExecPlan
-      "ProjectNode.*", # output columns
       "GroupByNode.*", # the group_by statement
       "keys=.*lgl.*", # the key for the aggregations
       "aggregates=.*hash_mean.*avg.*", # the aggregations
diff --git a/r/tests/testthat/test-dplyr-summarize.R b/r/tests/testthat/test-dplyr-summarize.R
index 6ee8982cc2..12ccec21ee 100644
--- a/r/tests/testthat/test-dplyr-summarize.R
+++ b/r/tests/testthat/test-dplyr-summarize.R
@@ -1136,14 +1136,14 @@ test_that("We don't add unnecessary ProjectNodes when aggregating", {
     0
   )
 
-  # 2 projections: one before, and one after in order to put grouping cols first
+  # Still just 1 projection
   expect_project_nodes(
     tab %>% group_by(lgl) %>% summarize(mean(int)),
-    2
+    1
   )
   expect_project_nodes(
     tab %>% count(lgl),
-    2
+    1
   )
 })
 
diff --git a/ruby/red-arrow/test/test-group.rb b/ruby/red-arrow/test/test-group.rb
index 2823977d5c..68e927df69 100644
--- a/ruby/red-arrow/test/test-group.rb
+++ b/ruby/red-arrow/test/test-group.rb
@@ -42,9 +42,9 @@ class GroupTest < Test::Unit::TestCase
       }
       table = Arrow::Table.new(raw_table)
       assert_equal(<<-TABLE, table.group(:time).count.to_s)
-	count(int)	                     time
-0	         1	#{time_values[0].iso8601}
-1	         1	#{time_values[1].iso8601}
+	                     time	count(int)
+0	#{time_values[0].iso8601}	         1
+1	#{time_values[1].iso8601}	         1
       TABLE
     end
   end
@@ -52,31 +52,31 @@ class GroupTest < Test::Unit::TestCase
   sub_test_case("#count") do
     test("single") do
       assert_equal(<<-TABLE, @table.group(:group_key1).count.to_s)
-	count(group_key2)	count(int)	count(uint)	count(float)	count(string)	group_key1
-0	                2	         2	          1	           1	            2	         1
-1	                1	         0	          1	           1	            1	         2
-2	                3	         3	          3	           3	            2	         3
+	group_key1	count(group_key2)	count(int)	count(uint)	count(float)	count(string)
+0	         1	                2	         2	          1	           1	            2
+1	         2	                1	         0	          1	           1	            1
+2	         3	                3	         3	          3	           3	            2
       TABLE
     end
 
     test("multiple") do
       assert_equal(<<-TABLE, @table.group(:group_key1, :group_key2).count.to_s)
-	count(int)	count(uint)	count(float)	count(string)	group_key1	group_key2
-0	         2	          1	           1	            2	         1	         1
-1	         0	          1	           1	            1	         2	         1
-2	         1	          1	           1	            0	         3	         1
-3	         2	          2	           2	            2	         3	         2
+	group_key1	group_key2	count(int)	count(uint)	count(float)	count(string)
+0	         1	         1	         2	          1	           1	            2
+1	         2	         1	         0	          1	           1	            1
+2	         3	         1	         1	          1	           1	            0
+3	         3	         2	         2	          2	           2	            2
       TABLE
     end
 
     test("column") do
       group = @table.group(:group_key1, :group_key2)
       assert_equal(<<-TABLE, group.count(:int, :uint).to_s)
-	count(int)	count(uint)	group_key1	group_key2
-0	         2	          1	         1	         1
-1	         0	          1	         2	         1
-2	         1	          1	         3	         1
-3	         2	          2	         3	         2
+	group_key1	group_key2	count(int)	count(uint)
+0	         1	         1	         2	          1
+1	         2	         1	         0	          1
+2	         3	         1	         1	          1
+3	         3	         2	         2	          2
       TABLE
     end
   end
@@ -84,20 +84,20 @@ class GroupTest < Test::Unit::TestCase
   sub_test_case("#sum") do
     test("single") do
       assert_equal(<<-TABLE, @table.group(:group_key1).sum.to_s)
-	sum(group_key2)	sum(int)	sum(uint)	sum(float)	group_key1
-0	              2	      -3	        1	  2.200000	         1
-1	              1	  (null)	        3	  3.300000	         2
-2	              5	     -15	       15	 16.500000	         3
+	group_key1	sum(group_key2)	sum(int)	sum(uint)	sum(float)
+0	         1	              2	      -3	        1	  2.200000
+1	         2	              1	  (null)	        3	  3.300000
+2	         3	              5	     -15	       15	 16.500000
       TABLE
     end
 
     test("multiple") do
       assert_equal(<<-TABLE, @table.group(:group_key1, :group_key2).sum.to_s)
-	sum(int)	sum(uint)	sum(float)	group_key1	group_key2
-0	      -3	        1	  2.200000	         1	         1
-1	  (null)	        3	  3.300000	         2	         1
-2	      -4	        4	  4.400000	         3	         1
-3	     -11	       11	 12.100000	         3	         2
+	group_key1	group_key2	sum(int)	sum(uint)	sum(float)
+0	         1	         1	      -3	        1	  2.200000
+1	         2	         1	  (null)	        3	  3.300000
+2	         3	         1	      -4	        4	  4.400000
+3	         3	         2	     -11	       11	 12.100000
       TABLE
     end
   end
@@ -105,20 +105,20 @@ class GroupTest < Test::Unit::TestCase
   sub_test_case("#mean") do
     test("single") do
       assert_equal(<<-TABLE, @table.group(:group_key1).mean.to_s)
-	mean(group_key2)	 mean(int)	mean(uint)	mean(float)	group_key1
-0	        1.000000	 -1.500000	  1.000000	   2.200000	         1
-1	        1.000000	    (null)	  3.000000	   3.300000	         2
-2	        1.666667	 -5.000000	  5.000000	   5.500000	         3
+	group_key1	mean(group_key2)	 mean(int)	mean(uint)	mean(float)
+0	         1	        1.000000	 -1.500000	  1.000000	   2.200000
+1	         2	        1.000000	    (null)	  3.000000	   3.300000
+2	         3	        1.666667	 -5.000000	  5.000000	   5.500000
       TABLE
     end
 
     test("multiple") do
       assert_equal(<<-TABLE, @table.group(:group_key1, :group_key2).mean.to_s)
-	 mean(int)	mean(uint)	mean(float)	group_key1	group_key2
-0	 -1.500000	  1.000000	   2.200000	         1	         1
-1	    (null)	  3.000000	   3.300000	         2	         1
-2	 -4.000000	  4.000000	   4.400000	         3	         1
-3	 -5.500000	  5.500000	   6.050000	         3	         2
+	group_key1	group_key2	 mean(int)	mean(uint)	mean(float)
+0	         1	         1	 -1.500000	  1.000000	   2.200000
+1	         2	         1	    (null)	  3.000000	   3.300000
+2	         3	         1	 -4.000000	  4.000000	   4.400000
+3	         3	         2	 -5.500000	  5.500000	   6.050000
       TABLE
     end
   end
@@ -126,20 +126,20 @@ class GroupTest < Test::Unit::TestCase
   sub_test_case("#min") do
     test("single") do
       assert_equal(<<-TABLE, @table.group(:group_key1).min.to_s)
-	min(group_key2)	min(int)	min(uint)	min(float)	group_key1
-0	              1	      -2	        1	  2.200000	         1
-1	              1	  (null)	        3	  3.300000	         2
-2	              1	      -6	        4	  4.400000	         3
+	group_key1	min(group_key2)	min(int)	min(uint)	min(float)
+0	         1	              1	      -2	        1	  2.200000
+1	         2	              1	  (null)	        3	  3.300000
+2	         3	              1	      -6	        4	  4.400000
       TABLE
     end
 
     test("multiple") do
       assert_equal(<<-TABLE, @table.group(:group_key1, :group_key2).min.to_s)
-	min(int)	min(uint)	min(float)	group_key1	group_key2
-0	      -2	        1	  2.200000	         1	         1
-1	  (null)	        3	  3.300000	         2	         1
-2	      -4	        4	  4.400000	         3	         1
-3	      -6	        5	  5.500000	         3	         2
+	group_key1	group_key2	min(int)	min(uint)	min(float)
+0	         1	         1	      -2	        1	  2.200000
+1	         2	         1	  (null)	        3	  3.300000
+2	         3	         1	      -4	        4	  4.400000
+3	         3	         2	      -6	        5	  5.500000
       TABLE
     end
   end
@@ -147,20 +147,20 @@ class GroupTest < Test::Unit::TestCase
   sub_test_case("#max") do
     test("single") do
       assert_equal(<<-TABLE, @table.group(:group_key1).max.to_s)
-	max(group_key2)	max(int)	max(uint)	max(float)	group_key1
-0	              1	      -1	        1	  2.200000	         1
-1	              1	  (null)	        3	  3.300000	         2
-2	              2	      -4	        6	  6.600000	         3
+	group_key1	max(group_key2)	max(int)	max(uint)	max(float)
+0	         1	              1	      -1	        1	  2.200000
+1	         2	              1	  (null)	        3	  3.300000
+2	         3	              2	      -4	        6	  6.600000
       TABLE
     end
 
     test("multiple") do
       assert_equal(<<-TABLE, @table.group(:group_key1, :group_key2).max.to_s)
-	max(int)	max(uint)	max(float)	group_key1	group_key2
-0	      -1	        1	  2.200000	         1	         1
-1	  (null)	        3	  3.300000	         2	         1
-2	      -4	        4	  4.400000	         3	         1
-3	      -5	        6	  6.600000	         3	         2
+	group_key1	group_key2	max(int)	max(uint)	max(float)
+0	         1	         1	      -1	        1	  2.200000
+1	         2	         1	  (null)	        3	  3.300000
+2	         3	         1	      -4	        4	  4.400000
+3	         3	         2	      -5	        6	  6.600000
       TABLE
     end
   end
@@ -169,11 +169,11 @@ class GroupTest < Test::Unit::TestCase
     test("function()") do
       group = @table.group(:group_key1, :group_key2)
       assert_equal(<<-TABLE, group.aggregate("count(int)", "sum(uint)").to_s)
-	count(int)	sum(uint)	group_key1	group_key2
-0	         2	        1	         1	         1
-1	         0	        3	         2	         1
-2	         1	        4	         3	         1
-3	         2	       11	         3	         2
+	group_key1	group_key2	count(int)	sum(uint)
+0	         1	         1	         2	        1
+1	         2	         1	         0	        3
+2	         3	         1	         1	        4
+3	         3	         2	         2	       11
       TABLE
     end
   end