You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/05/06 09:34:39 UTC

[GitHub] [arrow] westonpace opened a new pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

westonpace opened a new pull request #10258:
URL: https://github.com/apache/arrow/pull/10258


   Previously a future's callbacks would always run synchronously, either as part of `Future::MarkFinished` or as part of `Future::AddCallback`.  `Executor::Transfer` made it possible to schedule continuations on a new thread but it would only take effect if the transferred future's callbacks were added before the source future finished.  There are times when the desired behavior is to spawn a new thread task even if the source future is finished already.
   
   This PR adds four scheduling options:
   * Never - The default (and existing) behavior, never spawn a new task
   * IfUnfinished - The default behavior for transfers, spawn a new task only if the future isn't already finished
   * IfIdle - A new behavior, spawn a new task as long as there is a core in the destination thread pool that can process it
   * Always - Always spawn a new task, on both finished and unfinished futures, regardless of destination thread pool idleness.
   
   This PR also adds a best effort "IsIdle" mechanism to the thread pool.  It is best effort because logic like...
   
   ```
   if (pool->IsIdle()) {
     pool->Submit(...);
   }
   ```
   
   ...could still end up over-subscribing if tasks were added after the `IsIdle` check and before the `Submit` check.  However, this should be ok.  An alternative would be to do something like `TrySpawn` but that seemed like more complexity than needed at the moment.
   
   Leaving in draft until ARROW-12004 is merged as this one is built on top of that one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10258: ARROW-12560: [C++] Add scheduling option for Future callbacks

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#issuecomment-855931334


   Thanks for the update @westonpace . I'll merge once CI passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r642890540



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)

Review comment:
       `nullptr`

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) {
+    if (ShouldSchedule(callback_record, from_unfinished)) {
+      // Need to make a copy of this to keep it alive until the callback has a chance

Review comment:
       Why "a copy"? It's not clear to me where a copy is being made.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run synchronously

Review comment:
       "Describe"

##########
File path: cpp/src/arrow/util/test_common.h
##########
@@ -85,4 +88,18 @@ inline void AssertIteratorExhausted(Iterator<T>& it) {
 
 Transformer<TestInt, TestStr> MakeFilter(std::function<bool(TestInt&)> filter);
 
+class MockExecutor : public internal::Executor {

Review comment:
       Ideally, we should start categorizing our test utilities better instead of dumping them all in generic header files.

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) {
+    if (ShouldSchedule(callback_record, from_unfinished)) {
+      // Need to make a copy of this to keep it alive until the callback has a chance
+      // to be scheduled.
+      struct CallbackTask {
+        void operator()() { std::move(callback)(*self); }
+
+        Callback callback;
+        std::shared_ptr<FutureImpl> self;

Review comment:
       The code used to be capturing a weak pointer. Is it desirable to make this a strong pointer instead?

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) {

Review comment:
       The coding conventions prohibit passing mutable lrefs. You could make this a `CallbackRecord&&`, for example.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -952,6 +951,85 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+class FutureSchedulingTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = std::make_shared<MockExecutor>();
+};
+
+TEST_F(FutureSchedulingTest, ScheduleAlways) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::ALWAYS;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(4, spawn_count());
+  }
+}
+
+TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::IF_UNFINISHED;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(1, spawn_count());

Review comment:
       Unfortunately, this doesn't check which of the two callbacks was spawned.

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};

Review comment:
       Why doesn't `AddCallback` directly take a `CallbackRecord`?
   

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run synchronously
+enum ShouldSchedule {
+  /// Always run the callback synchronously (the default)
+  NEVER = 0,

Review comment:
       We should avoid using ALL_CAPS names, because of potential clashes with macros (this is a common issue with Windows headers, unfortunately).

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -272,8 +315,8 @@ class ConcreteFutureImpl : public FutureImpl {
     //
     // In fact, it is important not to hold the locks because the callback
     // may be slow or do its own locking on other resources
-    for (auto&& callback : callbacks_) {
-      std::move(callback)();
+    for (auto& callback_record : callbacks_) {
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/true);

Review comment:
       Shouldn't `from_unfinished` be false?

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -952,6 +951,85 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+class FutureSchedulingTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = std::make_shared<MockExecutor>();
+};
+
+TEST_F(FutureSchedulingTest, ScheduleAlways) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::ALWAYS;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(4, spawn_count());
+  }
+}
+
+TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::IF_UNFINISHED;

Review comment:
       NEVER is never tested?

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -256,6 +258,42 @@ TEST_P(TestRunSynchronously, PropagatedError) {
 INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously,
                          ::testing::Values(false, true));
 
+class TransferTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = std::make_shared<MockExecutor>();
+};
+
+TEST_F(TransferTest, DefaultTransferIfNotFinished) {
+  {
+    Future<> fut = Future<>::Make();
+    auto transferred = executor()->Transfer(fut);
+    fut.MarkFinished();
+    ASSERT_FINISHES_OK(transferred);
+    ASSERT_EQ(1, spawn_count());
+  }
+  {
+    Future<> fut = Future<>::Make();
+    fut.MarkFinished();
+    auto transferred = executor()->Transfer(fut);
+    ASSERT_FINISHES_OK(transferred);
+    ASSERT_EQ(1, spawn_count());
+  }
+}
+
+TEST_F(TransferTest, TransferAlways) {
+  {
+    Future<> fut = Future<>::Make();
+    fut.MarkFinished();
+    auto transferred = executor()->Transfer(fut, /*always_transfer=*/true);

Review comment:
       Nit, but it would probably be nicer to be able to spell this as `TransferAlways(fut)`.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -453,30 +480,35 @@ class Future {
   /// cyclic reference to itself through the callback.
   template <typename OnComplete>
   typename std::enable_if<!detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     // We know impl_ will not be dangling when invoking callbacks because at least one
     // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
     // weak reference to impl_ here
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().result()); }
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(*static_cast<Result<ValueType>*>(impl.result_.get()));
+      }
       OnComplete on_complete;
     };
-    impl_->AddCallback(Callback{WeakFuture<T>(*this), std::move(on_complete)});
+    impl_->AddCallback(Callback{std::move(on_complete)}, opts);
   }
 
   /// Overload for callbacks accepting a Status
   template <typename OnComplete>
   typename std::enable_if<detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     static_assert(std::is_same<internal::Empty, ValueType>::value,
                   "Callbacks for Future<> should accept Status and not Result");
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().status()); }
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(
+            static_cast<Result<ValueType>*>(impl.result_.get())->status());

Review comment:
       Can you make the `static_cast<>` dance a method on `FutureImpl`?

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -952,6 +951,85 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+class FutureSchedulingTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = std::make_shared<MockExecutor>();
+};
+
+TEST_F(FutureSchedulingTest, ScheduleAlways) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::ALWAYS;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(4, spawn_count());
+  }
+}
+
+TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::IF_UNFINISHED;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(1, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+}
+
+class DelayedExecutor : public internal::Executor {

Review comment:
       It's a bit weird to have this in a private test file, and the mock executor in a `.h`.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run synchronously
+enum ShouldSchedule {
+  /// Always run the callback synchronously (the default)
+  NEVER = 0,
+  /// Schedule a new task only if the future is not finished when the
+  /// callback is added
+  IF_UNFINISHED = 1,
+  /// Always schedule the callback as a new task
+  ALWAYS = 2
+};
+
+/// \brief Options that control how a continuation is run
+struct CallbackOptions {
+  /// Describes whether the callback should be run synchronously or scheduled

Review comment:
       "Describe"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r645783289



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -952,6 +951,85 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+class FutureSchedulingTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = std::make_shared<MockExecutor>();
+};
+
+TEST_F(FutureSchedulingTest, ScheduleAlways) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::ALWAYS;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(4, spawn_count());
+  }
+}
+
+TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::IF_UNFINISHED;

Review comment:
       Added a test.

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -256,6 +258,42 @@ TEST_P(TestRunSynchronously, PropagatedError) {
 INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously,
                          ::testing::Values(false, true));
 
+class TransferTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = std::make_shared<MockExecutor>();
+};
+
+TEST_F(TransferTest, DefaultTransferIfNotFinished) {
+  {
+    Future<> fut = Future<>::Make();
+    auto transferred = executor()->Transfer(fut);
+    fut.MarkFinished();
+    ASSERT_FINISHES_OK(transferred);
+    ASSERT_EQ(1, spawn_count());
+  }
+  {
+    Future<> fut = Future<>::Make();
+    fut.MarkFinished();
+    auto transferred = executor()->Transfer(fut);
+    ASSERT_FINISHES_OK(transferred);
+    ASSERT_EQ(1, spawn_count());
+  }
+}
+
+TEST_F(TransferTest, TransferAlways) {
+  {
+    Future<> fut = Future<>::Make();
+    fut.MarkFinished();
+    auto transferred = executor()->Transfer(fut, /*always_transfer=*/true);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643636029



##########
File path: cpp/src/arrow/util/test_common.h
##########
@@ -85,4 +88,18 @@ inline void AssertIteratorExhausted(Iterator<T>& it) {
 
 Transformer<TestInt, TestStr> MakeFilter(std::function<bool(TestInt&)> filter);
 
+class MockExecutor : public internal::Executor {

Review comment:
       There is a bit of categorization based on folder (e.g. `util/test_common.h` vs `io/test_common.h`).  This file isn't really all that large.  However, it could probably be split into `test_iterator_common.h` and `test_thread_pool_common.h`.  Or am I missing the point?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643961399



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) {
+    if (ShouldSchedule(callback_record, from_unfinished)) {
+      // Need to make a copy of this to keep it alive until the callback has a chance

Review comment:
       If it's only the `shared_ptr` copy, then I'm not sure it's worth mentioning.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r645780307



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#issuecomment-847444398


   Since I'm working on work stealing at the thread pool level I agree that idle is no longer needed.  I've cleaned this up and rebased.  It's much simpler than it was before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#issuecomment-855170428


   Ok, I've addressed the comments and this is ready for review again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r645781401



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -272,8 +315,8 @@ class ConcreteFutureImpl : public FutureImpl {
     //
     // In fact, it is important not to hold the locks because the callback
     // may be slow or do its own locking on other resources
-    for (auto&& callback : callbacks_) {
-      std::move(callback)();
+    for (auto& callback_record : callbacks_) {
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/true);

Review comment:
       Ok, I renamed it to `in_add_callback` (and inverted the logic).  Let me know if that is more clear.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643959976



##########
File path: cpp/src/arrow/util/test_common.h
##########
@@ -85,4 +88,18 @@ inline void AssertIteratorExhausted(Iterator<T>& it) {
 
 Transformer<TestInt, TestStr> MakeFilter(std::function<bool(TestInt&)> filter);
 
+class MockExecutor : public internal::Executor {

Review comment:
       I think we have `arrow/testing/future_util.h`. We could move the mock executor into that file, or into a separate `arrow/testing/executor_util.h`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#issuecomment-836713973


   I'm not sure why you're suggesting to add so much sophistication. To me there are only two interesting options: "always" and "if unfinished". So we could have `Transfer` (transfer always) vs. `TransferUnfinished`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10258: ARROW-12560: [C++] Add scheduling option for Future callbacks

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10258:
URL: https://github.com/apache/arrow/pull/10258


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643628821



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) {
+    if (ShouldSchedule(callback_record, from_unfinished)) {
+      // Need to make a copy of this to keep it alive until the callback has a chance
+      // to be scheduled.
+      struct CallbackTask {
+        void operator()() { std::move(callback)(*self); }
+
+        Callback callback;
+        std::shared_ptr<FutureImpl> self;

Review comment:
       The weak pointer in the old implementation was used to prevent futures from creating a circular reference on themselves (callback references future which references callback).
   
   Unfortunately, the weak pointer relied on the future remaining valid until all callbacks had run.  If all callbacks run synchronously this is easy (whomever is calling `MarkFinished` must have a valid reference until all callbacks finish).  Once we start scheduling callbacks we run into the problem where `MarkFinished` can return before some callbacks have run and then when those callbacks get scheduled the future has been deleted.
   
   This fix isn't just a change to a strong pointer though (that would introduce the circular reference problem again).  Instead of the callback itself having a reference to the future I changed it so that the callback took the FutureImpl in as an argument (note, this is the internal `FutureImpl` callback and not the publicly exposed `Future` callback).  This allowed me to avoid the circular reference because the strong pointer is created when the callback is being triggered and not when the callback is being added.
   
   Also, the strong pointer is only created if it is a scheduled callback.  Any existing performance should remain the same since no strong pointer of `shared_from_this` call is made.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643630671



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -272,8 +315,8 @@ class ConcreteFutureImpl : public FutureImpl {
     //
     // In fact, it is important not to hold the locks because the callback
     // may be slow or do its own locking on other resources
-    for (auto&& callback : callbacks_) {
-      std::move(callback)();
+    for (auto& callback_record : callbacks_) {
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/true);

Review comment:
       Hmm, `from_unfinished` is supposed to mean "was the callback added when the future was unfinished" but I can see how that is vague.  I could just remove this option entirely.  The `TryAddCallback` function already gives you this capability (it is used in `Transfer`) and so it isn't used anywhere.  Any strong opinion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r645759027



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};

Review comment:
       I'm only combining the two out of convenience for storage.  It seems like an implementation detail that doesn't need to be exposed.  Also, if I take a `CallbackRecord` directly here it will be asymmetric with `TryAddCallback` which can't take a `CallbackRecord` (unless we want the factory to return a callback record which exposes all of this outside the file as well).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#issuecomment-833384571


   CC @pitrou I think you referenced this in your latest execution engine PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r645780796



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) {
+    if (ShouldSchedule(callback_record, from_unfinished)) {
+      // Need to make a copy of this to keep it alive until the callback has a chance

Review comment:
       I think the more important thing is that we are intentionally extending the lifetime of the future.  I reworded the comment a bit and dropped the "copy".  I can always remove it if we want.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r645781862



##########
File path: cpp/src/arrow/util/test_common.h
##########
@@ -85,4 +88,18 @@ inline void AssertIteratorExhausted(Iterator<T>& it) {
 
 Transformer<TestInt, TestStr> MakeFilter(std::function<bool(TestInt&)> filter);
 
+class MockExecutor : public internal::Executor {

Review comment:
       I went ahead and added `executor_util.h`.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run synchronously

Review comment:
       Fixed.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run synchronously
+enum ShouldSchedule {
+  /// Always run the callback synchronously (the default)
+  NEVER = 0,
+  /// Schedule a new task only if the future is not finished when the
+  /// callback is added
+  IF_UNFINISHED = 1,
+  /// Always schedule the callback as a new task
+  ALWAYS = 2
+};
+
+/// \brief Options that control how a continuation is run
+struct CallbackOptions {
+  /// Describes whether the callback should be run synchronously or scheduled

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#issuecomment-847445822


   Also, I ran into a bit of trouble with the future callback's weak reference to the future.  Before we could just assume it was valid since all callbacks were completed before `MarkFinished` was completed.  Now, it is possible for a future to schedule a callback and that callback to far outlive the call to `MarkFinished`.  So now when a callback is scheduled (run on an executor) we make a copy of the `FutureImpl`'s shared_ptr to keep it alive until that callback has a chance to run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643625949



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) {
+    if (ShouldSchedule(callback_record, from_unfinished)) {
+      // Need to make a copy of this to keep it alive until the callback has a chance

Review comment:
       The copy is a few lines down when we call `shared_from_this`.  I'll move the comment and make it more explicit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643636361



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -952,6 +951,85 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+class FutureSchedulingTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = std::make_shared<MockExecutor>();
+};
+
+TEST_F(FutureSchedulingTest, ScheduleAlways) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::ALWAYS;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(4, spawn_count());
+  }
+}
+
+TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::IF_UNFINISHED;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(1, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+}
+
+class DelayedExecutor : public internal::Executor {

Review comment:
       My rationale was only that `DelayedExecutor` is only used in `future_test.cc` while `MockExecutor` is used in `future_test.cc` and `thread_pool_test.cc` but I see your point.  I'll move this into `test_common.h`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r645780928



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r645782645



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run synchronously
+enum ShouldSchedule {
+  /// Always run the callback synchronously (the default)
+  NEVER = 0,

Review comment:
       Ok.  I'll make a PR to add this to the style guide docs as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643963486



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -272,8 +315,8 @@ class ConcreteFutureImpl : public FutureImpl {
     //
     // In fact, it is important not to hold the locks because the callback
     // may be slow or do its own locking on other resources
-    for (auto&& callback : callbacks_) {
-      std::move(callback)();
+    for (auto& callback_record : callbacks_) {
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/true);

Review comment:
       I think `IfUnfinished` is ok, perhaps the `from_unfinished` variable needs better naming or an explanatory comment though :-)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643634671



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -453,30 +480,35 @@ class Future {
   /// cyclic reference to itself through the callback.
   template <typename OnComplete>
   typename std::enable_if<!detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     // We know impl_ will not be dangling when invoking callbacks because at least one
     // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
     // weak reference to impl_ here
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().result()); }
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(*static_cast<Result<ValueType>*>(impl.result_.get()));
+      }
       OnComplete on_complete;
     };
-    impl_->AddCallback(Callback{WeakFuture<T>(*this), std::move(on_complete)});
+    impl_->AddCallback(Callback{std::move(on_complete)}, opts);
   }
 
   /// Overload for callbacks accepting a Status
   template <typename OnComplete>
   typename std::enable_if<detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     static_assert(std::is_same<internal::Empty, ValueType>::value,
                   "Callbacks for Future<> should accept Status and not Result");
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().status()); }
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(
+            static_cast<Result<ValueType>*>(impl.result_.get())->status());

Review comment:
       It is a method on `Future` (named `GetResult`) but `FutureImpl` is type-erased and so it has no reference to `ValueType`.  If it were a `.cc` file I could extract it into an anonymous function but no luck there because of templates.  I think I can create a `future_internal.h`.  Would it be acceptable to create a method `GetResultFromFutureImpl` inside of `arrow::detail`?  Or is there some other trick I can use?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643960575



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run synchronously
+enum ShouldSchedule {
+  /// Always run the callback synchronously (the default)
+  NEVER = 0,

Review comment:
       `Always` sounds fine to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643691040



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -453,30 +480,35 @@ class Future {
   /// cyclic reference to itself through the callback.
   template <typename OnComplete>
   typename std::enable_if<!detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     // We know impl_ will not be dangling when invoking callbacks because at least one
     // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
     // weak reference to impl_ here
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().result()); }
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(*static_cast<Result<ValueType>*>(impl.result_.get()));
+      }
       OnComplete on_complete;
     };
-    impl_->AddCallback(Callback{WeakFuture<T>(*this), std::move(on_complete)});
+    impl_->AddCallback(Callback{std::move(on_complete)}, opts);
   }
 
   /// Overload for callbacks accepting a Status
   template <typename OnComplete>
   typename std::enable_if<detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     static_assert(std::is_same<internal::Empty, ValueType>::value,
                   "Callbacks for Future<> should accept Status and not Result");
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().status()); }
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(
+            static_cast<Result<ValueType>*>(impl.result_.get())->status());

Review comment:
       Ignore this.  I forgot type erased classes can still have templated functions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r645782882



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -952,6 +951,85 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+class FutureSchedulingTest : public testing::Test {
+ public:
+  internal::Executor* executor() { return mock_executor.get(); }
+  int spawn_count() { return mock_executor->spawn_count; }
+
+  std::function<void(const Status&)> callback = [](const Status&) {};
+  std::shared_ptr<MockExecutor> mock_executor = std::make_shared<MockExecutor>();
+};
+
+TEST_F(FutureSchedulingTest, ScheduleAlways) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::ALWAYS;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(2, spawn_count());
+  }
+  // Failing future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished(Status::Invalid("XYZ"));
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(4, spawn_count());
+  }
+}
+
+TEST_F(FutureSchedulingTest, ScheduleIfUnfinished) {
+  CallbackOptions options;
+  options.should_schedule = ShouldSchedule::IF_UNFINISHED;
+  options.executor = executor();
+  // Successful future
+  {
+    auto fut = Future<>::Make();
+    fut.AddCallback(callback, options);
+    fut.MarkFinished();
+    fut.AddCallback(callback, options);
+    ASSERT_EQ(1, spawn_count());

Review comment:
       I've made these tests a little more explicit so now you can tell.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r645781660



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -453,30 +480,35 @@ class Future {
   /// cyclic reference to itself through the callback.
   template <typename OnComplete>
   typename std::enable_if<!detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     // We know impl_ will not be dangling when invoking callbacks because at least one
     // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
     // weak reference to impl_ here
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().result()); }
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(*static_cast<Result<ValueType>*>(impl.result_.get()));
+      }
       OnComplete on_complete;
     };
-    impl_->AddCallback(Callback{WeakFuture<T>(*this), std::move(on_complete)});
+    impl_->AddCallback(Callback{std::move(on_complete)}, opts);
   }
 
   /// Overload for callbacks accepting a Status
   template <typename OnComplete>
   typename std::enable_if<detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     static_assert(std::is_same<internal::Empty, ValueType>::value,
                   "Callbacks for Future<> should accept Status and not Result");
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().status()); }
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(
+            static_cast<Result<ValueType>*>(impl.result_.get())->status());

Review comment:
       I added `FutureImpl::CastResult`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643962243



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -231,26 +232,68 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
-  void AddCallback(Callback callback) {
+  void CheckOptions(const CallbackOptions& opts) {
+    if (opts.should_schedule != ShouldSchedule::NEVER) {
+      DCHECK_NE(opts.executor, NULL)
+          << "An executor must be specified when adding a callback that might schedule";
+    }
+  }
+
+  void AddCallback(Callback callback, CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
+    CallbackRecord callback_record{std::move(callback), opts};
     if (IsFutureFinished(state_)) {
       lock.unlock();
-      std::move(callback)();
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/false);
     } else {
-      callbacks_.push_back(std::move(callback));
+      callbacks_.push_back(std::move(callback_record));
     }
   }
 
-  bool TryAddCallback(const std::function<Callback()>& callback_factory) {
+  bool TryAddCallback(const std::function<Callback()>& callback_factory,
+                      CallbackOptions opts) {
+    CheckOptions(opts);
     std::unique_lock<std::mutex> lock(mutex_);
     if (IsFutureFinished(state_)) {
       return false;
     } else {
-      callbacks_.push_back(callback_factory());
+      callbacks_.push_back({callback_factory(), opts});
       return true;
     }
   }
 
+  bool ShouldSchedule(const CallbackRecord& callback_record, bool from_unfinished) {
+    switch (callback_record.options.should_schedule) {
+      case ShouldSchedule::NEVER:
+        return false;
+      case ShouldSchedule::ALWAYS:
+        return true;
+      case ShouldSchedule::IF_UNFINISHED:
+        return from_unfinished;
+      default:
+        DCHECK(false) << "Unrecognized ShouldSchedule option";
+        return false;
+    }
+  }
+
+  void RunOrScheduleCallback(CallbackRecord& callback_record, bool from_unfinished) {
+    if (ShouldSchedule(callback_record, from_unfinished)) {
+      // Need to make a copy of this to keep it alive until the callback has a chance
+      // to be scheduled.
+      struct CallbackTask {
+        void operator()() { std::move(callback)(*self); }
+
+        Callback callback;
+        std::shared_ptr<FutureImpl> self;

Review comment:
       Ah, ok. Thanks for the clarification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643632152



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -202,8 +202,30 @@ enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
 
 inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
 
+/// \brief Describes whether the callback should be scheduled or run synchronously
+enum ShouldSchedule {
+  /// Always run the callback synchronously (the default)
+  NEVER = 0,

Review comment:
       Hmm, technically the style guide prefers `kAlways` but I see `Always` used more often in Arrow.  Although some of the gandiva code uses kAlways. (https://google.github.io/styleguide/cppguide.html#Enumerator_Names).  Any preference?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#issuecomment-852702836


   @pitrou Don't worry about the delay, I've been plenty busy elsewhere.  I have a just a few follow-up questions and then I'll make the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#issuecomment-833384472


   https://issues.apache.org/jira/browse/ARROW-12560


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643632996



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -272,8 +315,8 @@ class ConcreteFutureImpl : public FutureImpl {
     //
     // In fact, it is important not to hold the locks because the callback
     // may be slow or do its own locking on other resources
-    for (auto&& callback : callbacks_) {
-      std::move(callback)();
+    for (auto& callback_record : callbacks_) {
+      RunOrScheduleCallback(callback_record, /*from_unfinished=*/true);

Review comment:
       On the bright side, if we remove `IF_UNFINISHED` then we can change `ShouldSchedule` to a `bool`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10258: ARROW-12560: [C++] Investigate utilizing aggressive thread task creation when adding callback to finished future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10258:
URL: https://github.com/apache/arrow/pull/10258#discussion_r643634671



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -453,30 +480,35 @@ class Future {
   /// cyclic reference to itself through the callback.
   template <typename OnComplete>
   typename std::enable_if<!detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     // We know impl_ will not be dangling when invoking callbacks because at least one
     // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
     // weak reference to impl_ here
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().result()); }
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(*static_cast<Result<ValueType>*>(impl.result_.get()));
+      }
       OnComplete on_complete;
     };
-    impl_->AddCallback(Callback{WeakFuture<T>(*this), std::move(on_complete)});
+    impl_->AddCallback(Callback{std::move(on_complete)}, opts);
   }
 
   /// Overload for callbacks accepting a Status
   template <typename OnComplete>
   typename std::enable_if<detail::first_arg_is_status<OnComplete>::value>::type
-  AddCallback(OnComplete on_complete) const {
+  AddCallback(OnComplete on_complete,
+              CallbackOptions opts = CallbackOptions::Defaults()) const {
     static_assert(std::is_same<internal::Empty, ValueType>::value,
                   "Callbacks for Future<> should accept Status and not Result");
     struct Callback {
-      void operator()() && { std::move(on_complete)(weak_self.get().status()); }
-      WeakFuture<T> weak_self;
+      void operator()(const FutureImpl& impl) && {
+        std::move(on_complete)(
+            static_cast<Result<ValueType>*>(impl.result_.get())->status());

Review comment:
       It is a method on `Future` (named `GetResult`) but `FutureImpl` is type-erased and so it has no reference to `ValueType`.  If it were a `.cc` file I could extract it into an anonymous function but no luck there because of templates.  Would it be acceptable to create a method `GetResultFromFutureImpl` inside of `arrow::detail`?  Or is there some other trick I can use?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org