You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/11/02 21:41:59 UTC

[arrow] branch master updated: ARROW-18183: [C++] cpp-micro benchmarks are failing on mac arm machine (#14562)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e162a5499 ARROW-18183: [C++] cpp-micro benchmarks are failing on mac arm machine (#14562)
0e162a5499 is described below

commit 0e162a54996962b0fce3cf2bfa2d7d5fe323062c
Author: Weston Pace <we...@gmail.com>
AuthorDate: Wed Nov 2 14:41:54 2022 -0700

    ARROW-18183: [C++] cpp-micro benchmarks are failing on mac arm machine (#14562)
    
    Authored-by: Weston Pace <we...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/util/async_util.h | 45 +++++++++++++++++++++++++++++++++--------
 1 file changed, 37 insertions(+), 8 deletions(-)

diff --git a/cpp/src/arrow/util/async_util.h b/cpp/src/arrow/util/async_util.h
index e0b6cfc699..fd04d89f15 100644
--- a/cpp/src/arrow/util/async_util.h
+++ b/cpp/src/arrow/util/async_util.h
@@ -25,6 +25,7 @@
 #include "arrow/util/future.h"
 #include "arrow/util/iterator.h"
 #include "arrow/util/mutex.h"
+#include "arrow/util/thread_pool.h"
 
 #include <memory>
 
@@ -222,23 +223,51 @@ class ARROW_EXPORT AsyncTaskScheduler {
     struct SubmitTask : public Task {
       explicit SubmitTask(std::unique_ptr<State> state_holder)
           : state_holder(std::move(state_holder)) {}
+
       struct SubmitTaskCallback {
-        explicit SubmitTaskCallback(std::unique_ptr<State> state_holder)
-            : state_holder(std::move(state_holder)) {}
-        Status operator()(const T& item) {
+        SubmitTaskCallback(std::unique_ptr<State> state_holder, Future<> task_completion)
+            : state_holder(std::move(state_holder)),
+              task_completion(std::move(task_completion)) {}
+        void operator()(const Result<T>& maybe_item) {
+          if (!maybe_item.ok()) {
+            task_completion.MarkFinished(maybe_item.status());
+            return;
+          }
+          const auto& item = *maybe_item;
           if (IsIterationEnd(item)) {
-            return Status::OK();
+            task_completion.MarkFinished();
+            return;
+          }
+          Status visit_st = state_holder->visitor(item);
+          if (!visit_st.ok()) {
+            task_completion.MarkFinished(std::move(visit_st));
+            return;
           }
-          ARROW_RETURN_NOT_OK(state_holder->visitor(item));
           state_holder->scheduler->AddTask(
               std::make_unique<SubmitTask>(std::move(state_holder)));
-          return Status::OK();
+          task_completion.MarkFinished();
         }
         std::unique_ptr<State> state_holder;
+        Future<> task_completion;
       };
+
       Result<Future<>> operator()(AsyncTaskScheduler* scheduler) {
-        Future<T> next = state_holder->generator();
-        return next.Then(SubmitTaskCallback(std::move(state_holder)));
+        Future<> task = Future<>::Make();
+        // Consume as many items as we can (those that are already finished)
+        // synchronously to avoid recursion / stack overflow.
+        while (true) {
+          Future<T> next = state_holder->generator();
+          if (next.TryAddCallback(
+                  [&] { return SubmitTaskCallback(std::move(state_holder), task); })) {
+            return task;
+          }
+          ARROW_ASSIGN_OR_RAISE(T item, next.result());
+          if (IsIterationEnd(item)) {
+            task.MarkFinished();
+            return task;
+          }
+          ARROW_RETURN_NOT_OK(state_holder->visitor(item));
+        }
       }
       std::unique_ptr<State> state_holder;
     };