You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/11/02 21:41:59 UTC
[arrow] branch master updated: ARROW-18183: [C++] cpp-micro benchmarks are failing on mac arm machine (#14562)
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 0e162a5499 ARROW-18183: [C++] cpp-micro benchmarks are failing on mac arm machine (#14562)
0e162a5499 is described below
commit 0e162a54996962b0fce3cf2bfa2d7d5fe323062c
Author: Weston Pace <we...@gmail.com>
AuthorDate: Wed Nov 2 14:41:54 2022 -0700
ARROW-18183: [C++] cpp-micro benchmarks are failing on mac arm machine (#14562)
Authored-by: Weston Pace <we...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/util/async_util.h | 45 +++++++++++++++++++++++++++++++++--------
1 file changed, 37 insertions(+), 8 deletions(-)
diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h
index e0b6cfc699..fd04d89f15 100644
--- a/cpp/src/arrow/util/async_util.h
+++ b/cpp/src/arrow/util/async_util.h
@@ -25,6 +25,7 @@
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
#include "arrow/util/mutex.h"
+#include "arrow/util/thread_pool.h"
#include <memory>
@@ -222,23 +223,51 @@ class ARROW_EXPORT AsyncTaskScheduler {
struct SubmitTask : public Task {
explicit SubmitTask(std::unique_ptr<State> state_holder)
: state_holder(std::move(state_holder)) {}
+
struct SubmitTaskCallback {
- explicit SubmitTaskCallback(std::unique_ptr<State> state_holder)
- : state_holder(std::move(state_holder)) {}
- Status operator()(const T& item) {
+ SubmitTaskCallback(std::unique_ptr<State> state_holder, Future<> task_completion)
+ : state_holder(std::move(state_holder)),
+ task_completion(std::move(task_completion)) {}
+ void operator()(const Result<T>& maybe_item) {
+ if (!maybe_item.ok()) {
+ task_completion.MarkFinished(maybe_item.status());
+ return;
+ }
+ const auto& item = *maybe_item;
if (IsIterationEnd(item)) {
- return Status::OK();
+ task_completion.MarkFinished();
+ return;
+ }
+ Status visit_st = state_holder->visitor(item);
+ if (!visit_st.ok()) {
+ task_completion.MarkFinished(std::move(visit_st));
+ return;
}
- ARROW_RETURN_NOT_OK(state_holder->visitor(item));
state_holder->scheduler->AddTask(
std::make_unique<SubmitTask>(std::move(state_holder)));
- return Status::OK();
+ task_completion.MarkFinished();
}
std::unique_ptr<State> state_holder;
+ Future<> task_completion;
};
+
Result<Future<>> operator()(AsyncTaskScheduler* scheduler) {
- Future<T> next = state_holder->generator();
- return next.Then(SubmitTaskCallback(std::move(state_holder)));
+ Future<> task = Future<>::Make();
+ // Consume as many items as we can (those that are already finished)
+ // synchronously to avoid recursion / stack overflow.
+ while (true) {
+ Future<T> next = state_holder->generator();
+ if (next.TryAddCallback(
+ [&] { return SubmitTaskCallback(std::move(state_holder), task); })) {
+ return task;
+ }
+ ARROW_ASSIGN_OR_RAISE(T item, next.result());
+ if (IsIterationEnd(item)) {
+ task.MarkFinished();
+ return task;
+ }
+ ARROW_RETURN_NOT_OK(state_holder->visitor(item));
+ }
}
std::unique_ptr<State> state_holder;
};