You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2022/07/26 09:57:19 UTC

[arrow] branch maint-9.0.0 updated (42647dcd00 -> 74a4a0244e)

This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a change to branch maint-9.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git


    from 42647dcd00 ARROW-16887: [R][Docs] Update Filesystem Vignette for GCS (#13601)
     new 25145ed386 ARROW-17197: [R] floor_date/ceiling_date lubridate comparison tests failing on macOS (#13705)
     new ffdd3a334c ARROW-16692: [C++] StackOverflow in merge generator causes segmentation fault in scan (#13691)
     new 74a4a0244e ARROW-15591: [C++] Add support for aggregation to the Substrait consumer (#13130)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 cpp/src/arrow/dataset/scanner_test.cc              |   6 +
 cpp/src/arrow/engine/substrait/extension_set.cc    |   1 +
 .../arrow/engine/substrait/relation_internal.cc    |  72 ++++-
 cpp/src/arrow/engine/substrait/serde_test.cc       | 317 +++++++++++++++++++++
 cpp/src/arrow/util/async_generator.h               | 115 +++++---
 cpp/src/arrow/util/async_generator_test.cc         |  19 ++
 r/tests/testthat/test-dplyr-funcs-datetime.R       |   4 +-
 7 files changed, 493 insertions(+), 41 deletions(-)


[arrow] 01/03: ARROW-17197: [R] floor_date/ceiling_date lubridate comparison tests failing on macOS (#13705)

Posted by ks...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-9.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 25145ed386b38c7d242d20ea52d15ec1481170b7
Author: Rok Mihevc <ro...@mihevc.org>
AuthorDate: Mon Jul 25 23:46:41 2022 +0200

    ARROW-17197: [R] floor_date/ceiling_date lubridate comparison tests failing on macOS (#13705)
    
    This resolves [ARROW-17197](https://issues.apache.org/jira/browse/ARROW-17197).
    
    Authored-by: Rok <ro...@mihevc.org>
    Signed-off-by: Rok <ro...@mihevc.org>
---
 r/tests/testthat/test-dplyr-funcs-datetime.R | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/r/tests/testthat/test-dplyr-funcs-datetime.R b/r/tests/testthat/test-dplyr-funcs-datetime.R
index 0c655e48bb..b8bd28e970 100644
--- a/r/tests/testthat/test-dplyr-funcs-datetime.R
+++ b/r/tests/testthat/test-dplyr-funcs-datetime.R
@@ -3138,12 +3138,12 @@ test_that("temporal round/floor/ceil period unit maxima are enforced", {
 # produces incorrect answers
 check_timezone_rounding_vs_lubridate <- function(data, unit) {
 
-  # esoteric lubridate bug: on windows only (not ubuntu), lubridate returns
+  # esoteric lubridate bug: on windows and macOS (not linux), lubridate returns
   # incorrect ceiling/floor for timezoned POSIXct times (syd, adl, kat zones,
   # but not mar) but not utc, and not for round, and only for these two
   # timestamps where high-precision timing is relevant to the outcome
   if (unit %in% c(".001 second", "second", "minute")) {
-    if (tolower(Sys.info()[["sysname"]]) == "windows") {
+    if (tolower(Sys.info()[["sysname"]]) %in% c("windows", "darwin")) {
       data <- data[-c(1, 3), ]
     }
   }


[arrow] 03/03: ARROW-15591: [C++] Add support for aggregation to the Substrait consumer (#13130)

Posted by ks...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-9.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 74a4a0244ed7ea5acc3512e0a9a93036844abc0e
Author: Vibhatha Lakmal Abeykoon <vi...@users.noreply.github.com>
AuthorDate: Tue Jul 26 05:29:15 2022 +0530

    ARROW-15591: [C++] Add support for aggregation to the Substrait consumer (#13130)
    
    This PR includes the Substrait-Arrow Aggregate integration where a Substrait plan can be consumed in ACERO.
    
    Lead-authored-by: Vibhatha Abeykoon <vi...@gmail.com>
    Co-authored-by: Vibhatha Lakmal Abeykoon <vi...@users.noreply.github.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/engine/substrait/extension_set.cc    |   1 +
 .../arrow/engine/substrait/relation_internal.cc    |  72 ++++-
 cpp/src/arrow/engine/substrait/serde_test.cc       | 317 +++++++++++++++++++++
 3 files changed, 389 insertions(+), 1 deletion(-)

diff --git a/cpp/src/arrow/engine/substrait/extension_set.cc b/cpp/src/arrow/engine/substrait/extension_set.cc
index f60f6ac1cb..08eb6acc9c 100644
--- a/cpp/src/arrow/engine/substrait/extension_set.cc
+++ b/cpp/src/arrow/engine/substrait/extension_set.cc
@@ -445,6 +445,7 @@ struct DefaultExtensionIdRegistry : ExtensionIdRegistryImpl {
              "add",
              "equal",
              "is_not_distinct_from",
+             "hash_count",
          }) {
       DCHECK_OK(RegisterFunction({kArrowExtTypesUri, name}, name.to_string()));
     }
diff --git a/cpp/src/arrow/engine/substrait/relation_internal.cc b/cpp/src/arrow/engine/substrait/relation_internal.cc
index 09ecb2f069..8f6cb0ce36 100644
--- a/cpp/src/arrow/engine/substrait/relation_internal.cc
+++ b/cpp/src/arrow/engine/substrait/relation_internal.cc
@@ -307,7 +307,7 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel,
             callptr->function_name);
       }
 
-      // TODO: ARROW-166241 Add Suffix support for Substrait
+      // TODO: ARROW-16624 Add Suffix support for Substrait
       const auto* left_keys = callptr->arguments[0].field_ref();
       const auto* right_keys = callptr->arguments[1].field_ref();
       if (!left_keys || !right_keys) {
@@ -323,6 +323,76 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel,
       join_dec.inputs.emplace_back(std::move(right.declaration));
       return DeclarationInfo{std::move(join_dec), num_columns};
     }
+    case substrait::Rel::RelTypeCase::kAggregate: {
+      const auto& aggregate = rel.aggregate();
+      RETURN_NOT_OK(CheckRelCommon(aggregate));
+
+      if (!aggregate.has_input()) {
+        return Status::Invalid("substrait::AggregateRel with no input relation");
+      }
+
+      ARROW_ASSIGN_OR_RAISE(auto input, FromProto(aggregate.input(), ext_set));
+
+      if (aggregate.groupings_size() > 1) {
+        return Status::NotImplemented(
+            "Grouping sets not supported.  AggregateRel::groupings may not have more "
+            "than one item");
+      }
+      std::vector<FieldRef> keys;
+      auto group = aggregate.groupings(0);
+      keys.reserve(group.grouping_expressions_size());
+      for (int exp_id = 0; exp_id < group.grouping_expressions_size(); exp_id++) {
+        ARROW_ASSIGN_OR_RAISE(auto expr,
+                              FromProto(group.grouping_expressions(exp_id), ext_set));
+        const auto* field_ref = expr.field_ref();
+        if (field_ref) {
+          keys.emplace_back(std::move(*field_ref));
+        } else {
+          return Status::Invalid(
+              "The grouping expression for an aggregate must be a direct reference.");
+        }
+      }
+
+      int measure_size = aggregate.measures_size();
+      std::vector<compute::Aggregate> aggregates;
+      aggregates.reserve(measure_size);
+      for (int measure_id = 0; measure_id < measure_size; measure_id++) {
+        const auto& agg_measure = aggregate.measures(measure_id);
+        if (agg_measure.has_measure()) {
+          if (agg_measure.has_filter()) {
+            return Status::NotImplemented("Aggregate filters are not supported.");
+          }
+          const auto& agg_func = agg_measure.measure();
+          if (agg_func.arguments_size() != 1) {
+            return Status::NotImplemented("Aggregate function must be a unary function.");
+          }
+          int func_reference = agg_func.function_reference();
+          ARROW_ASSIGN_OR_RAISE(auto func_record, ext_set.DecodeFunction(func_reference));
+          // aggreagte function name
+          auto func_name = std::string(func_record.id.name);
+          // aggregate target
+          auto subs_func_args = agg_func.arguments(0);
+          ARROW_ASSIGN_OR_RAISE(auto field_expr,
+                                FromProto(subs_func_args.value(), ext_set));
+          auto target = field_expr.field_ref();
+          if (!target) {
+            return Status::Invalid(
+                "The input expression to an aggregate function must be a direct "
+                "reference.");
+          }
+          aggregates.emplace_back(compute::Aggregate{std::move(func_name), NULLPTR,
+                                                     std::move(*target), std::move("")});
+        } else {
+          return Status::Invalid("substrait::AggregateFunction not provided");
+        }
+      }
+
+      return DeclarationInfo{
+          compute::Declaration::Sequence(
+              {std::move(input.declaration),
+               {"aggregate", compute::AggregateNodeOptions{aggregates, keys}}}),
+          static_cast<int>(aggregates.size())};
+    }
 
     default:
       break;
diff --git a/cpp/src/arrow/engine/substrait/serde_test.cc b/cpp/src/arrow/engine/substrait/serde_test.cc
index e10082392d..8e5745d6df 100644
--- a/cpp/src/arrow/engine/substrait/serde_test.cc
+++ b/cpp/src/arrow/engine/substrait/serde_test.cc
@@ -1383,5 +1383,322 @@ TEST(Substrait, JoinPlanInvalidKeys) {
   }
 }
 
+TEST(Substrait, AggregateBasic) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "arguments": [{
+                "value": {
+                  "selection": {
+                    "directReference": {
+                      "structField": {
+                        "field": 1
+                      }
+                    }
+                  }
+                }
+            }],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  auto sp_ext_id_reg = substrait::MakeExtensionIdRegistry();
+  ASSERT_OK_AND_ASSIGN(auto sink_decls,
+                       DeserializePlans(*buf, [] { return kNullConsumer; }));
+  auto agg_decl = sink_decls[0].inputs[0];
+
+  const auto& agg_rel = agg_decl.get<compute::Declaration>();
+
+  const auto& agg_options =
+      checked_cast<const compute::AggregateNodeOptions&>(*agg_rel->options);
+
+  EXPECT_EQ(agg_rel->factory_name, "aggregate");
+  EXPECT_EQ(agg_options.aggregates[0].name, "");
+  EXPECT_EQ(agg_options.aggregates[0].function, "hash_count");
+}
+
+TEST(Substrait, AggregateInvalidRel) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  ASSERT_RAISES(Invalid, DeserializePlans(*buf, [] { return kNullConsumer; }));
+}
+
+TEST(Substrait, AggregateInvalidFunction) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  ASSERT_RAISES(Invalid, DeserializePlans(*buf, [] { return kNullConsumer; }));
+}
+
+TEST(Substrait, AggregateInvalidAggFuncArgs) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "args": [],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "hash_count"
+      }
+    }],
+  })"));
+
+  ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return kNullConsumer; }));
+}
+
+TEST(Substrait, AggregateWithFilter) {
+  ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan", R"({
+    "relations": [{
+      "rel": {
+        "aggregate": {
+          "input": {
+            "read": {
+              "base_schema": {
+                "names": ["A", "B", "C"],
+                "struct": {
+                  "types": [{
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }, {
+                    "i32": {}
+                  }]
+                }
+              },
+              "local_files": { 
+                "items": [
+                  {
+                    "uri_file": "file:///tmp/dat.parquet",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "groupings": [{
+            "groupingExpressions": [{
+              "selection": {
+                "directReference": {
+                  "structField": {
+                    "field": 0
+                  }
+                }
+              }
+            }]
+          }],
+          "measures": [{
+            "measure": {
+              "functionReference": 0,
+              "args": [],
+              "sorts": [],
+              "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
+              "outputType": {
+                "i64": {}
+              }
+            }
+          }]
+        }
+      }
+    }],
+    "extensionUris": [{
+      "extension_uri_anchor": 0,
+      "uri": "https://github.com/apache/arrow/blob/master/format/substrait/extension_types.yaml"
+    }],
+    "extensions": [{
+      "extension_function": {
+        "extension_uri_reference": 0,
+        "function_anchor": 0,
+        "name": "equal"
+      }
+    }],
+  })"));
+
+  ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return kNullConsumer; }));
+}
+
 }  // namespace engine
 }  // namespace arrow


[arrow] 02/03: ARROW-16692: [C++] StackOverflow in merge generator causes segmentation fault in scan (#13691)

Posted by ks...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-9.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit ffdd3a334c15a2ec4f9360382d9437b2368d262a
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon Jul 25 13:31:27 2022 -1000

    ARROW-16692: [C++] StackOverflow in merge generator causes segmentation fault in scan (#13691)
    
    Fix for the merged generator to avoid a potential stack overflow
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/dataset/scanner_test.cc      |   6 ++
 cpp/src/arrow/util/async_generator.h       | 115 +++++++++++++++++++----------
 cpp/src/arrow/util/async_generator_test.cc |  19 +++++
 3 files changed, 102 insertions(+), 38 deletions(-)

diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc
index 26a2353332..804e82b57d 100644
--- a/cpp/src/arrow/dataset/scanner_test.cc
+++ b/cpp/src/arrow/dataset/scanner_test.cc
@@ -766,6 +766,12 @@ TEST_P(TestScanner, FromReader) {
         Invalid, ::testing::HasSubstr("OneShotFragment was already scanned"),
         std::move(maybe_batch_it));
   }
+
+  // TODO(ARROW-16072) At the moment, we can't be sure that the scanner has completely
+  // shutdown, even though the plan has finished, because errors are not handled cleanly
+  // in the scanner/execplan relationship.  Once ARROW-16072 is fixed this should be
+  // reliable and we can get rid of this.  See also ARROW-17198
+  ::arrow::internal::GetCpuThreadPool()->WaitForIdle();
 }
 
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h
index 4aa0a3b18d..9819b5ce92 100644
--- a/cpp/src/arrow/util/async_generator.h
+++ b/cpp/src/arrow/util/async_generator.h
@@ -1093,7 +1093,7 @@ class MergedGenerator {
         state_->all_finished.MarkFinished();
       } else {
         delivered_job->deliverer().AddCallback(
-            InnerCallback{state_, delivered_job->index});
+            InnerCallback(state_, delivered_job->index));
       }
       return std::move(delivered_job->value);
     }
@@ -1253,6 +1253,9 @@ class MergedGenerator {
   };
 
   struct InnerCallback {
+    InnerCallback(std::shared_ptr<State> state, std::size_t index, bool recursive = false)
+        : state(std::move(state)), index(index), recursive(recursive) {}
+
     void operator()(const Result<T>& maybe_next_ref) {
       // An item has been delivered by one of the inner subscriptions
       Future<T> next_fut;
@@ -1332,6 +1335,10 @@ class MergedGenerator {
         }
 
         if (pull_next_sub) {
+          if (recursive) {
+            was_empty = true;
+            return;
+          }
           // We pulled an end token so we need to start a new subscription
           // in our spot
           state->PullSource().AddCallback(OuterCallback{state, index});
@@ -1340,7 +1347,7 @@ class MergedGenerator {
           // so lets fetch the next result from our subscription
           sink.MarkFinished(*maybe_next);
           next_fut = state->active_subscriptions[index]();
-          if (next_fut.TryAddCallback([this]() { return *this; })) {
+          if (next_fut.TryAddCallback([this]() { return InnerCallback(state, index); })) {
             return;
           }
           // Already completed. Avoid very deep recursion by looping
@@ -1355,49 +1362,81 @@ class MergedGenerator {
     }
     std::shared_ptr<State> state;
     std::size_t index;
+    bool recursive;
+    bool was_empty = false;
   };
 
   struct OuterCallback {
-    void operator()(const Result<AsyncGenerator<T>>& maybe_next) {
-      // We have been given a new inner subscription
-      bool should_continue = false;
-      bool should_mark_gen_complete = false;
-      bool should_deliver_error = false;
-      bool source_exhausted = maybe_next.ok() && IsIterationEnd(*maybe_next);
-      Future<T> error_sink;
-      {
-        auto guard = state->mutex.Lock();
-        if (!maybe_next.ok() || source_exhausted || state->broken) {
-          // If here then we will not pull any more from the outer source
-          if (!state->broken && !maybe_next.ok()) {
-            state->SignalErrorUnlocked(guard);
-            // If here then we are the first error so we need to deliver it
-            should_deliver_error = true;
-            if (!state->waiting_jobs.empty()) {
-              error_sink = std::move(*state->waiting_jobs.front());
-              state->waiting_jobs.pop_front();
+    void operator()(const Result<AsyncGenerator<T>>& initial_maybe_next) {
+      Result<AsyncGenerator<T>> maybe_next = initial_maybe_next;
+      while (true) {
+        // We have been given a new inner subscription
+        bool should_continue = false;
+        bool should_mark_gen_complete = false;
+        bool should_deliver_error = false;
+        bool source_exhausted = maybe_next.ok() && IsIterationEnd(*maybe_next);
+        Future<T> error_sink;
+        {
+          auto guard = state->mutex.Lock();
+          if (!maybe_next.ok() || source_exhausted || state->broken) {
+            // If here then we will not pull any more from the outer source
+            if (!state->broken && !maybe_next.ok()) {
+              state->SignalErrorUnlocked(guard);
+              // If here then we are the first error so we need to deliver it
+              should_deliver_error = true;
+              if (!state->waiting_jobs.empty()) {
+                error_sink = std::move(*state->waiting_jobs.front());
+                state->waiting_jobs.pop_front();
+              }
             }
+            if (source_exhausted) {
+              state->source_exhausted = true;
+              state->num_running_subscriptions--;
+            }
+            if (state->MarkTaskFinishedUnlocked(guard)) {
+              should_mark_gen_complete = true;
+            }
+          } else {
+            state->active_subscriptions[index] = *maybe_next;
+            should_continue = true;
           }
-          if (source_exhausted) {
-            state->source_exhausted = true;
-            state->num_running_subscriptions--;
-          }
-          if (state->MarkTaskFinishedUnlocked(guard)) {
-            should_mark_gen_complete = true;
+        }
+        if (should_deliver_error) {
+          state->MarkFinalError(maybe_next.status(), std::move(error_sink));
+        }
+        if (should_mark_gen_complete) {
+          state->MarkFinishedAndPurge();
+        }
+        if (should_continue) {
+          // There is a possibility that a large sequence of immediately available inner
+          // callbacks could lead to a stack overflow.  To avoid this we need to
+          // synchronously loop through inner/outer callbacks until we either find an
+          // unfinished future or we find an actual item to deliver.
+          Future<T> next_item = (*maybe_next)();
+          if (!next_item.TryAddCallback([this] { return InnerCallback(state, index); })) {
+            // By setting recursive to true we signal to the inner callback that, if it is
+            // empty, instead of adding a new outer callback, it should just immediately
+            // return, flagging was_empty so that we know we need to check the next
+            // subscription.
+            InnerCallback immediate_inner(state, index, /*recursive=*/true);
+            immediate_inner(next_item.result());
+            if (immediate_inner.was_empty) {
+              Future<AsyncGenerator<T>> next_source = state->PullSource();
+              if (next_source.TryAddCallback([this] {
+                    return OuterCallback{state, index};
+                  })) {
+                // We hit an unfinished future so we can stop looping
+                return;
+              }
+              // The current subscription was immediately and synchronously empty
+              // and we were able to synchronously pull the next subscription so we
+              // can keep looping.
+              maybe_next = next_source.result();
+              continue;
+            }
           }
-        } else {
-          state->active_subscriptions[index] = *maybe_next;
-          should_continue = true;
         }
-      }
-      if (should_deliver_error) {
-        state->MarkFinalError(maybe_next.status(), std::move(error_sink));
-      }
-      if (should_mark_gen_complete) {
-        state->MarkFinishedAndPurge();
-      }
-      if (should_continue) {
-        (*maybe_next)().AddCallback(InnerCallback{state, index});
+        return;
       }
     }
     std::shared_ptr<State> state;
diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc
index e8163e83c9..e75ca577c7 100644
--- a/cpp/src/arrow/util/async_generator_test.cc
+++ b/cpp/src/arrow/util/async_generator_test.cc
@@ -762,6 +762,25 @@ TEST_P(MergedGeneratorTestFixture, MergedRecursion) {
   }
 }
 
+TEST_P(MergedGeneratorTestFixture, DeepOuterGeneratorStackOverflow) {
+  // Simulate a very deep and very quick outer generator that yields simple
+  // inner generators.  Everything completes synchronously.  This is to
+  // try and provoke a stack overflow the simulates ARROW-16692
+  constexpr int kNumItems = 10000;
+  constexpr int kMaxSubscriptions = 8;
+  std::vector<AsyncGenerator<TestInt>> inner_generators;
+  for (int i = 0; i < kNumItems; i++) {
+    inner_generators.push_back(MakeVectorGenerator<TestInt>({}));
+  }
+  AsyncGenerator<AsyncGenerator<TestInt>> outer_generator =
+      MakeVectorGenerator(inner_generators);
+  AsyncGenerator<TestInt> merged =
+      MakeMergedGenerator(outer_generator, kMaxSubscriptions);
+  ASSERT_FINISHES_OK_AND_ASSIGN(std::vector<TestInt> collected,
+                                CollectAsyncGenerator(std::move(merged)));
+  ASSERT_TRUE(collected.empty());
+}
+
 INSTANTIATE_TEST_SUITE_P(MergedGeneratorTests, MergedGeneratorTestFixture,
                          ::testing::Values(false, true));