You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2022/07/26 09:57:21 UTC
[arrow] 02/03: ARROW-16692: [C++] StackOverflow in merge generator causes segmentation fault in scan (#13691)
This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch maint-9.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit ffdd3a334c15a2ec4f9360382d9437b2368d262a
Author: Weston Pace <we...@gmail.com>
AuthorDate: Mon Jul 25 13:31:27 2022 -1000
ARROW-16692: [C++] StackOverflow in merge generator causes segmentation fault in scan (#13691)
Fix for the merged generator to avoid a potential stack overflow
Authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/dataset/scanner_test.cc | 6 ++
cpp/src/arrow/util/async_generator.h | 115 +++++++++++++++++++----------
cpp/src/arrow/util/async_generator_test.cc | 19 +++++
3 files changed, 102 insertions(+), 38 deletions(-)
diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc
index 26a2353332..804e82b57d 100644
--- a/cpp/src/arrow/dataset/scanner_test.cc
+++ b/cpp/src/arrow/dataset/scanner_test.cc
@@ -766,6 +766,12 @@ TEST_P(TestScanner, FromReader) {
Invalid, ::testing::HasSubstr("OneShotFragment was already scanned"),
std::move(maybe_batch_it));
}
+
+ // TODO(ARROW-16072) At the moment, we can't be sure that the scanner has completely
+ // shutdown, even though the plan has finished, because errors are not handled cleanly
+ // in the scanner/execplan relationship. Once ARROW-16072 is fixed this should be
+ // reliable and we can get rid of this. See also ARROW-17198
+ ::arrow::internal::GetCpuThreadPool()->WaitForIdle();
}
INSTANTIATE_TEST_SUITE_P(TestScannerThreading, TestScanner,
diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h
index 4aa0a3b18d..9819b5ce92 100644
--- a/cpp/src/arrow/util/async_generator.h
+++ b/cpp/src/arrow/util/async_generator.h
@@ -1093,7 +1093,7 @@ class MergedGenerator {
state_->all_finished.MarkFinished();
} else {
delivered_job->deliverer().AddCallback(
- InnerCallback{state_, delivered_job->index});
+ InnerCallback(state_, delivered_job->index));
}
return std::move(delivered_job->value);
}
@@ -1253,6 +1253,9 @@ class MergedGenerator {
};
struct InnerCallback {
+ InnerCallback(std::shared_ptr<State> state, std::size_t index, bool recursive = false)
+ : state(std::move(state)), index(index), recursive(recursive) {}
+
void operator()(const Result<T>& maybe_next_ref) {
// An item has been delivered by one of the inner subscriptions
Future<T> next_fut;
@@ -1332,6 +1335,10 @@ class MergedGenerator {
}
if (pull_next_sub) {
+ if (recursive) {
+ was_empty = true;
+ return;
+ }
// We pulled an end token so we need to start a new subscription
// in our spot
state->PullSource().AddCallback(OuterCallback{state, index});
@@ -1340,7 +1347,7 @@ class MergedGenerator {
// so lets fetch the next result from our subscription
sink.MarkFinished(*maybe_next);
next_fut = state->active_subscriptions[index]();
- if (next_fut.TryAddCallback([this]() { return *this; })) {
+ if (next_fut.TryAddCallback([this]() { return InnerCallback(state, index); })) {
return;
}
// Already completed. Avoid very deep recursion by looping
@@ -1355,49 +1362,81 @@ class MergedGenerator {
}
std::shared_ptr<State> state;
std::size_t index;
+ bool recursive;
+ bool was_empty = false;
};
struct OuterCallback {
- void operator()(const Result<AsyncGenerator<T>>& maybe_next) {
- // We have been given a new inner subscription
- bool should_continue = false;
- bool should_mark_gen_complete = false;
- bool should_deliver_error = false;
- bool source_exhausted = maybe_next.ok() && IsIterationEnd(*maybe_next);
- Future<T> error_sink;
- {
- auto guard = state->mutex.Lock();
- if (!maybe_next.ok() || source_exhausted || state->broken) {
- // If here then we will not pull any more from the outer source
- if (!state->broken && !maybe_next.ok()) {
- state->SignalErrorUnlocked(guard);
- // If here then we are the first error so we need to deliver it
- should_deliver_error = true;
- if (!state->waiting_jobs.empty()) {
- error_sink = std::move(*state->waiting_jobs.front());
- state->waiting_jobs.pop_front();
+ void operator()(const Result<AsyncGenerator<T>>& initial_maybe_next) {
+ Result<AsyncGenerator<T>> maybe_next = initial_maybe_next;
+ while (true) {
+ // We have been given a new inner subscription
+ bool should_continue = false;
+ bool should_mark_gen_complete = false;
+ bool should_deliver_error = false;
+ bool source_exhausted = maybe_next.ok() && IsIterationEnd(*maybe_next);
+ Future<T> error_sink;
+ {
+ auto guard = state->mutex.Lock();
+ if (!maybe_next.ok() || source_exhausted || state->broken) {
+ // If here then we will not pull any more from the outer source
+ if (!state->broken && !maybe_next.ok()) {
+ state->SignalErrorUnlocked(guard);
+ // If here then we are the first error so we need to deliver it
+ should_deliver_error = true;
+ if (!state->waiting_jobs.empty()) {
+ error_sink = std::move(*state->waiting_jobs.front());
+ state->waiting_jobs.pop_front();
+ }
}
+ if (source_exhausted) {
+ state->source_exhausted = true;
+ state->num_running_subscriptions--;
+ }
+ if (state->MarkTaskFinishedUnlocked(guard)) {
+ should_mark_gen_complete = true;
+ }
+ } else {
+ state->active_subscriptions[index] = *maybe_next;
+ should_continue = true;
}
- if (source_exhausted) {
- state->source_exhausted = true;
- state->num_running_subscriptions--;
- }
- if (state->MarkTaskFinishedUnlocked(guard)) {
- should_mark_gen_complete = true;
+ }
+ if (should_deliver_error) {
+ state->MarkFinalError(maybe_next.status(), std::move(error_sink));
+ }
+ if (should_mark_gen_complete) {
+ state->MarkFinishedAndPurge();
+ }
+ if (should_continue) {
+ // There is a possibility that a large sequence of immediately available inner
+ // callbacks could lead to a stack overflow. To avoid this we need to
+ // synchronously loop through inner/outer callbacks until we either find an
+ // unfinished future or we find an actual item to deliver.
+ Future<T> next_item = (*maybe_next)();
+ if (!next_item.TryAddCallback([this] { return InnerCallback(state, index); })) {
+ // By setting recursive to true we signal to the inner callback that, if it is
+ // empty, instead of adding a new outer callback, it should just immediately
+ // return, flagging was_empty so that we know we need to check the next
+ // subscription.
+ InnerCallback immediate_inner(state, index, /*recursive=*/true);
+ immediate_inner(next_item.result());
+ if (immediate_inner.was_empty) {
+ Future<AsyncGenerator<T>> next_source = state->PullSource();
+ if (next_source.TryAddCallback([this] {
+ return OuterCallback{state, index};
+ })) {
+ // We hit an unfinished future so we can stop looping
+ return;
+ }
+ // The current subscription was immediately and synchronously empty
+ // and we were able to synchronously pull the next subscription so we
+ // can keep looping.
+ maybe_next = next_source.result();
+ continue;
+ }
}
- } else {
- state->active_subscriptions[index] = *maybe_next;
- should_continue = true;
}
- }
- if (should_deliver_error) {
- state->MarkFinalError(maybe_next.status(), std::move(error_sink));
- }
- if (should_mark_gen_complete) {
- state->MarkFinishedAndPurge();
- }
- if (should_continue) {
- (*maybe_next)().AddCallback(InnerCallback{state, index});
+ return;
}
}
std::shared_ptr<State> state;
diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc
index e8163e83c9..e75ca577c7 100644
--- a/cpp/src/arrow/util/async_generator_test.cc
+++ b/cpp/src/arrow/util/async_generator_test.cc
@@ -762,6 +762,25 @@ TEST_P(MergedGeneratorTestFixture, MergedRecursion) {
}
}
+TEST_P(MergedGeneratorTestFixture, DeepOuterGeneratorStackOverflow) {
+ // Simulate a very deep and very quick outer generator that yields simple
+ // inner generators. Everything completes synchronously. This is to
+ // try and provoke a stack overflow the simulates ARROW-16692
+ constexpr int kNumItems = 10000;
+ constexpr int kMaxSubscriptions = 8;
+ std::vector<AsyncGenerator<TestInt>> inner_generators;
+ for (int i = 0; i < kNumItems; i++) {
+ inner_generators.push_back(MakeVectorGenerator<TestInt>({}));
+ }
+ AsyncGenerator<AsyncGenerator<TestInt>> outer_generator =
+ MakeVectorGenerator(inner_generators);
+ AsyncGenerator<TestInt> merged =
+ MakeMergedGenerator(outer_generator, kMaxSubscriptions);
+ ASSERT_FINISHES_OK_AND_ASSIGN(std::vector<TestInt> collected,
+ CollectAsyncGenerator(std::move(merged)));
+ ASSERT_TRUE(collected.empty());
+}
+
INSTANTIATE_TEST_SUITE_P(MergedGeneratorTests, MergedGeneratorTestFixture,
::testing::Values(false, true));