You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2023/04/13 14:50:44 UTC
[arrow] branch main updated: GH-34539: [C++] Fix throttled scheduler to avoid stack overflow in dataset writer (#35075)
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 196222dbd5 GH-34539: [C++] Fix throttled scheduler to avoid stack overflow in dataset writer (#35075)
196222dbd5 is described below
commit 196222dbd543d6931f4a1432845add97be0db802
Author: Weston Pace <we...@gmail.com>
AuthorDate: Thu Apr 13 07:50:32 2023 -0700
GH-34539: [C++] Fix throttled scheduler to avoid stack overflow in dataset writer (#35075)
### Rationale for this change
Fixes a bug in the throttled scheduler.
### What changes are included in this PR?
The throttled scheduler will no longer recurse in the ContinueTasks loop if the continued task was immediately finished.
### Are these changes tested?
Yes, I added a new stress test that exposed the stack overflow very reliably on a standard Linux system.
### Are there any user-facing changes?
No.
* Closes: #34539
Authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/util/async_util.cc | 26 +++++++++++++++++++-------
cpp/src/arrow/util/async_util_test.cc | 24 ++++++++++++++++++++++++
2 files changed, 43 insertions(+), 7 deletions(-)
diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc
index 0a59a462c9..55627eb43b 100644
--- a/cpp/src/arrow/util/async_util.cc
+++ b/cpp/src/arrow/util/async_util.cc
@@ -322,7 +322,7 @@ class ThrottledAsyncTaskSchedulerImpl
return true;
} else {
lk.unlock();
- return SubmitTask(std::move(task), latched_cost);
+ return SubmitTask(std::move(task), latched_cost, /*in_continue=*/false);
}
}
@@ -331,18 +331,30 @@ class ThrottledAsyncTaskSchedulerImpl
const util::tracing::Span& span() const override { return target_->span(); }
private:
- bool SubmitTask(std::unique_ptr<Task> task, int latched_cost) {
+ bool SubmitTask(std::unique_ptr<Task> task, int latched_cost, bool in_continue) {
// Wrap the task with a wrapper that runs it and then checks to see if there are any
// queued tasks
std::string_view name = task->name();
return target_->AddSimpleTask(
- [latched_cost, inner_task = std::move(task),
+ [latched_cost, in_continue, inner_task = std::move(task),
self = shared_from_this()]() mutable -> Result<Future<>> {
ARROW_ASSIGN_OR_RAISE(Future<> inner_fut, (*inner_task)());
- return inner_fut.Then([latched_cost, self = std::move(self)] {
+ if (!inner_fut.TryAddCallback([&] {
+ return [latched_cost, self = std::move(self)](const Status& st) -> void {
+ if (st.ok()) {
+ self->throttle_->Release(latched_cost);
+ self->ContinueTasks();
+ }
+ };
+ })) {
+ // If the task is already finished then don't run ContinueTasks
+ // if we are already running it so we can avoid stack overflow
self->throttle_->Release(latched_cost);
- self->ContinueTasks();
- });
+ if (!in_continue) {
+ self->ContinueTasks();
+ }
+ }
+ return inner_fut;
},
name);
}
@@ -371,7 +383,7 @@ class ThrottledAsyncTaskSchedulerImpl
} else {
std::unique_ptr<Task> next_task = queue_->Pop();
lk.unlock();
- if (!SubmitTask(std::move(next_task), next_cost)) {
+ if (!SubmitTask(std::move(next_task), next_cost, /*in_continue=*/true)) {
return;
}
lk.lock();
diff --git a/cpp/src/arrow/util/async_util_test.cc b/cpp/src/arrow/util/async_util_test.cc
index 119ca7aa42..7734b84c9e 100644
--- a/cpp/src/arrow/util/async_util_test.cc
+++ b/cpp/src/arrow/util/async_util_test.cc
@@ -595,6 +595,30 @@ TEST(AsyncTaskScheduler, ScanningStress) {
ASSERT_EQ(kExpectedBatchesScanned, batches_scanned.load());
}
}
+
+TEST(AsyncTaskScheduler, ThrottleStress) {
+ // Queue up a bunch of throttled fast tasks. It shouldn't cause stack overflow
+ constexpr int kNumTasks = 1024 * 10;
+ int num_tasks_run = 0;
+ Future<> slow_task = Future<>::Make();
+ Future<> finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) {
+ std::shared_ptr<ThrottledAsyncTaskScheduler> throttled =
+ ThrottledAsyncTaskScheduler::Make(scheduler, 1);
+ EXPECT_TRUE(throttled->AddSimpleTask([slow_task] { return slow_task; }, kDummyName));
+ for (int task_idx = 0; task_idx < kNumTasks; task_idx++) {
+ throttled->AddSimpleTask(
+ [&] {
+ num_tasks_run++;
+ return Future<>::MakeFinished();
+ },
+ kDummyName);
+ }
+ return Status::OK();
+ });
+ slow_task.MarkFinished();
+ ASSERT_FINISHES_OK(finished);
+ ASSERT_EQ(kNumTasks, num_tasks_run);
+}
#endif
class TaskWithPriority : public AsyncTaskScheduler::Task {