You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/07/25 23:31:35 UTC

[arrow] branch master updated: ARROW-16692: [C++] StackOverflow in merge generator causes segmentation fault in scan (#13691)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 898e12e677 ARROW-16692: [C++] StackOverflow in merge generator causes segmentation fault in scan (#13691)
898e12e677 is described below

commit 898e12e67759de3df8e6c3ca75e3701c58ab50b3
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon Jul 25 13:31:27 2022 -1000

    ARROW-16692: [C++] StackOverflow in merge generator causes segmentation fault in scan (#13691)
    
    Fix for the merged generator to avoid a potential stack overflow
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/dataset/scanner_test.cc      |   6 ++
 cpp/src/arrow/util/async_generator.h       | 115 +++++++++++++++++++----------
 cpp/src/arrow/util/async_generator_test.cc |  19 +++++
 3 files changed, 102 insertions(+), 38 deletions(-)

diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc
index 26a2353332..804e82b57d 100644
--- a/cpp/src/arrow/dataset/scanner_test.cc
+++ b/cpp/src/arrow/dataset/scanner_test.cc
@@ -766,6 +766,12 @@ TEST_P(TestScanner, FromReader) {
         Invalid, ::testing::HasSubstr("OneShotFragment was already scanned"),
         std::move(maybe_batch_it));
   }
+
+  // TODO(ARROW-16072) At the moment, we can't be sure that the scanner has completely
+  // shutdown, even though the plan has finished, because errors are not handled cleanly
+  // in the scanner/execplan relationship.  Once ARROW-16072 is fixed this should be
+  // reliable and we can get rid of this.  See also ARROW-17198
+  ::arrow::internal::GetCpuThreadPool()->WaitForIdle();
 }
 
 INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h
index 4aa0a3b18d..9819b5ce92 100644
--- a/cpp/src/arrow/util/async_generator.h
+++ b/cpp/src/arrow/util/async_generator.h
@@ -1093,7 +1093,7 @@ class MergedGenerator {
         state_->all_finished.MarkFinished();
       } else {
         delivered_job->deliverer().AddCallback(
-            InnerCallback{state_, delivered_job->index});
+            InnerCallback(state_, delivered_job->index));
       }
       return std::move(delivered_job->value);
     }
@@ -1253,6 +1253,9 @@ class MergedGenerator {
   };
 
   struct InnerCallback {
+    InnerCallback(std::shared_ptr<State> state, std::size_t index, bool recursive = false)
+        : state(std::move(state)), index(index), recursive(recursive) {}
+
     void operator()(const Result<T>& maybe_next_ref) {
       // An item has been delivered by one of the inner subscriptions
       Future<T> next_fut;
@@ -1332,6 +1335,10 @@ class MergedGenerator {
         }
 
         if (pull_next_sub) {
+          if (recursive) {
+            was_empty = true;
+            return;
+          }
           // We pulled an end token so we need to start a new subscription
           // in our spot
           state->PullSource().AddCallback(OuterCallback{state, index});
@@ -1340,7 +1347,7 @@ class MergedGenerator {
           // so lets fetch the next result from our subscription
           sink.MarkFinished(*maybe_next);
           next_fut = state->active_subscriptions[index]();
-          if (next_fut.TryAddCallback([this]() { return *this; })) {
+          if (next_fut.TryAddCallback([this]() { return InnerCallback(state, index); })) {
             return;
           }
           // Already completed. Avoid very deep recursion by looping
@@ -1355,49 +1362,81 @@ class MergedGenerator {
     }
     std::shared_ptr<State> state;
     std::size_t index;
+    bool recursive;
+    bool was_empty = false;
   };
 
   struct OuterCallback {
-    void operator()(const Result<AsyncGenerator<T>>& maybe_next) {
-      // We have been given a new inner subscription
-      bool should_continue = false;
-      bool should_mark_gen_complete = false;
-      bool should_deliver_error = false;
-      bool source_exhausted = maybe_next.ok() && IsIterationEnd(*maybe_next);
-      Future<T> error_sink;
-      {
-        auto guard = state->mutex.Lock();
-        if (!maybe_next.ok() || source_exhausted || state->broken) {
-          // If here then we will not pull any more from the outer source
-          if (!state->broken && !maybe_next.ok()) {
-            state->SignalErrorUnlocked(guard);
-            // If here then we are the first error so we need to deliver it
-            should_deliver_error = true;
-            if (!state->waiting_jobs.empty()) {
-              error_sink = std::move(*state->waiting_jobs.front());
-              state->waiting_jobs.pop_front();
+    void operator()(const Result<AsyncGenerator<T>>& initial_maybe_next) {
+      Result<AsyncGenerator<T>> maybe_next = initial_maybe_next;
+      while (true) {
+        // We have been given a new inner subscription
+        bool should_continue = false;
+        bool should_mark_gen_complete = false;
+        bool should_deliver_error = false;
+        bool source_exhausted = maybe_next.ok() && IsIterationEnd(*maybe_next);
+        Future<T> error_sink;
+        {
+          auto guard = state->mutex.Lock();
+          if (!maybe_next.ok() || source_exhausted || state->broken) {
+            // If here then we will not pull any more from the outer source
+            if (!state->broken && !maybe_next.ok()) {
+              state->SignalErrorUnlocked(guard);
+              // If here then we are the first error so we need to deliver it
+              should_deliver_error = true;
+              if (!state->waiting_jobs.empty()) {
+                error_sink = std::move(*state->waiting_jobs.front());
+                state->waiting_jobs.pop_front();
+              }
             }
+            if (source_exhausted) {
+              state->source_exhausted = true;
+              state->num_running_subscriptions--;
+            }
+            if (state->MarkTaskFinishedUnlocked(guard)) {
+              should_mark_gen_complete = true;
+            }
+          } else {
+            state->active_subscriptions[index] = *maybe_next;
+            should_continue = true;
           }
-          if (source_exhausted) {
-            state->source_exhausted = true;
-            state->num_running_subscriptions--;
-          }
-          if (state->MarkTaskFinishedUnlocked(guard)) {
-            should_mark_gen_complete = true;
+        }
+        if (should_deliver_error) {
+          state->MarkFinalError(maybe_next.status(), std::move(error_sink));
+        }
+        if (should_mark_gen_complete) {
+          state->MarkFinishedAndPurge();
+        }
+        if (should_continue) {
+          // There is a possibility that a large sequence of immediately available inner
+          // callbacks could lead to a stack overflow.  To avoid this we need to
+          // synchronously loop through inner/outer callbacks until we either find an
+          // unfinished future or we find an actual item to deliver.
+          Future<T> next_item = (*maybe_next)();
+          if (!next_item.TryAddCallback([this] { return InnerCallback(state, index); })) {
+            // By setting recursive to true we signal to the inner callback that, if it is
+            // empty, instead of adding a new outer callback, it should just immediately
+            // return, flagging was_empty so that we know we need to check the next
+            // subscription.
+            InnerCallback immediate_inner(state, index, /*recursive=*/true);
+            immediate_inner(next_item.result());
+            if (immediate_inner.was_empty) {
+              Future<AsyncGenerator<T>> next_source = state->PullSource();
+              if (next_source.TryAddCallback([this] {
+                    return OuterCallback{state, index};
+                  })) {
+                // We hit an unfinished future so we can stop looping
+                return;
+              }
+              // The current subscription was immediately and synchronously empty
+              // and we were able to synchronously pull the next subscription so we
+              // can keep looping.
+              maybe_next = next_source.result();
+              continue;
+            }
           }
-        } else {
-          state->active_subscriptions[index] = *maybe_next;
-          should_continue = true;
         }
-      }
-      if (should_deliver_error) {
-        state->MarkFinalError(maybe_next.status(), std::move(error_sink));
-      }
-      if (should_mark_gen_complete) {
-        state->MarkFinishedAndPurge();
-      }
-      if (should_continue) {
-        (*maybe_next)().AddCallback(InnerCallback{state, index});
+        return;
       }
     }
     std::shared_ptr<State> state;
diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc
index e8163e83c9..e75ca577c7 100644
--- a/cpp/src/arrow/util/async_generator_test.cc
+++ b/cpp/src/arrow/util/async_generator_test.cc
@@ -762,6 +762,25 @@ TEST_P(MergedGeneratorTestFixture, MergedRecursion) {
   }
 }
 
+TEST_P(MergedGeneratorTestFixture, DeepOuterGeneratorStackOverflow) {
+  // Simulate a very deep and very quick outer generator that yields simple
+  // inner generators.  Everything completes synchronously.  This is to
+  // try and provoke a stack overflow the simulates ARROW-16692
+  constexpr int kNumItems = 10000;
+  constexpr int kMaxSubscriptions = 8;
+  std::vector<AsyncGenerator<TestInt>> inner_generators;
+  for (int i = 0; i < kNumItems; i++) {
+    inner_generators.push_back(MakeVectorGenerator<TestInt>({}));
+  }
+  AsyncGenerator<AsyncGenerator<TestInt>> outer_generator =
+      MakeVectorGenerator(inner_generators);
+  AsyncGenerator<TestInt> merged =
+      MakeMergedGenerator(outer_generator, kMaxSubscriptions);
+  ASSERT_FINISHES_OK_AND_ASSIGN(std::vector<TestInt> collected,
+                                CollectAsyncGenerator(std::move(merged)));
+  ASSERT_TRUE(collected.empty());
+}
+
 INSTANTIATE_TEST_SUITE_P(MergedGeneratorTests, MergedGeneratorTestFixture,
                          ::testing::Values(false, true));