You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/11/16 17:13:25 UTC

[GitHub] [arrow] bkietz opened a new pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

bkietz opened a new pull request #8680:
URL: https://github.com/apache/arrow/pull/8680


   Adds `Future<T>::Then(OnSuccess, OnFailure)` which registers callbacks to be executed on completion of the future and yields a future which wraps the result of those callbacks; if a callback returns:
   
   - `void`, Then() returns a `Future<>` which completes successully as soon as the callback runs.
   - `Status`, Then() returns a `Future<>` which completes with the returned `Status` as soon as the callback runs.
   - `V` or `Result<V>`, Then() returns a `Future<V>` which completes with whatever the callback returns.
   - `Future<V>`, Then() returns a `Future<V>` which will be marked complete when the future returned by the callback completes (and will complete with the same result).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r546031132



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -122,15 +122,21 @@ void AssertFinished(const Future<T>& fut) {
 // Assert the future is successful *now*
 template <typename T>
 void AssertSuccessful(const Future<T>& fut) {
-  ASSERT_EQ(fut.state(), FutureState::SUCCESS);
-  ASSERT_OK(fut.status());
+  ASSERT_TRUE(fut.Wait(0.1));

Review comment:
       Yes, that is write, `status` was the blocking call.  You are also right that it was not succeeding.  However, it would hang indefinitely.  This made it more difficult to debug.
   
   So the old implementation was correct, but inconvenient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545195108



##########
File path: cpp/src/arrow/util/thread_pool_benchmark.cc
##########
@@ -136,21 +168,24 @@ static void ThreadedTaskGroup(benchmark::State& state) {
 
   for (auto _ : state) {
     auto task_group = TaskGroup::MakeThreaded(pool.get());
-    for (int32_t i = 0; i < nspawns; ++i) {
-      // Pass the task by reference to avoid copying it around
-      task_group->Append(std::ref(task));
-    }
+    task_group->Append([&task, nspawns, task_group] {

Review comment:
       It's not exactly a holdover.  `SerialTaskGroup::AppendReal` has `DCHECK(!finished_);` which makes sense (you don't want to add a task to a finished task group).  However, if you add the same `DCHECK(!finished_);` to `ThreadedTaskGroup` then the old non-nested approach in `TestTaskGroupErrors` (in task_group_test.cc) fails.
   
   What happens is that the first one or two tasks gets added to the task group and, if they are fast, they finish before the task adder gets a chance to add the rest of the tasks and the task group gets marked finished.  This doesn't happen with the serial task group since that task group cannot be marked finished until the caller waits on this.
   
   Ben and I had some discussion about possibly adding a `Start` method to task group but I don't remember where that ended up.  So as it is right now you need to do this nested approach to ensure that the task group does not finish until you have scheduled all the tasks you want to run.
   
   Also, I put a commented out `DCHECK(!finished_);` in `ThreadedTaskGroup`.  I don't want to uncomment it because there are still some places using threaded task group that don't use it in the nested fashion and I don't want to break them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545721559



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -273,11 +349,134 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  ///
+  /// The callback should receive the result of the future (const Result<T>&)
+  /// For a void or statusy future this should be
+  /// (const Result<Future<detail::Empty>::ValueType>& result)

Review comment:
       Yes.  They receive `const Result<T>&` or `const Result<detail::Empty>&`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545719929



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });

Review comment:
       Added...
   ```
       // run callbacks, lock not needed since the future is finsihed by this
       // point so nothing else can modify the callbacks list and it is safe
       // to iterate.
       //
       // In fact, it is important not to hold the locks because the callback
       // may be slow or do its own locking on other resources
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r546009382



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });

Review comment:
       I'll add a comment illustrating the potential memory leak




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545368032



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +283,636 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureTest, StressCallback) {
+  for (unsigned int n = 0; n < 1000; n++) {
+    auto fut = Future<>::Make();
+    std::atomic<unsigned int> count_finished_immediately(0);
+    std::atomic<unsigned int> count_finished_deferred(0);
+    std::atomic<unsigned int> callbacks_added(0);
+    std::atomic<bool> finished(false);
+
+    std::thread callback_adder([&] {
+      auto test_thread = std::this_thread::get_id();
+      while (!finished.load()) {
+        fut.AddCallback(
+            [&test_thread, &count_finished_immediately, &count_finished_deferred](
+                const Result<arrow::Future<arrow::detail::Empty>::ValueType>& result) {
+              if (std::this_thread::get_id() == test_thread) {
+                count_finished_immediately++;
+              } else {
+                count_finished_deferred++;
+              }
+            });
+        callbacks_added++;
+      }
+    });
+
+    while (callbacks_added.load() == 0) {
+      // Spin until the callback_adder has started running
+    }
+
+    fut.MarkFinished();
+
+    while (count_finished_deferred.load() == 0) {

Review comment:
       Shouldn't you wait on `count_finished_immediately` instead? It seems `count_finished_deferred` (callback called from main thread) is for pre-`MarkFinished` callbacks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534351861



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());

Review comment:
       no, since this is in a FnOnce all members will be dropped as soon as it's invoked




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545214296



##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -116,25 +109,15 @@ class ThreadedTaskGroup : public TaskGroup {
       cv_.wait(lock, [&]() { return nremaining_.load() == 0; });
       // Current tasks may start other tasks, so only set this when done
       finished_ = true;
-      if (parent_) {
-        parent_->OneTaskDone();
-      }
+      completion_future_.MarkFinished(status_);
     }
     return status_;
   }
 
   int parallelism() override { return executor_->GetCapacity(); }
 
-  std::shared_ptr<TaskGroup> MakeSubGroup() override {
-    std::lock_guard<std::mutex> lock(mutex_);
-    auto child = new ThreadedTaskGroup(executor_);
-    child->parent_ = this;
-    nremaining_.fetch_add(1, std::memory_order_acquire);
-    return std::shared_ptr<TaskGroup>(child);
-  }
-
  protected:
-  void UpdateStatus(Status&& st) {
+  void UpdateStatus(const Status& st) {

Review comment:
       Changed back to `Status&& st`.  Not sure what happened here.

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -159,13 +142,13 @@ class ThreadedTaskGroup : public TaskGroup {
   Executor* executor_;
   std::atomic<int32_t> nremaining_;
   std::atomic<bool> ok_;
+  Future<> completion_future_ = Future<>::Make();

Review comment:
       Cleaned up




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545725266



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure

Review comment:
       Many of these tests look very similar.  Each test in `FutureCompletionTest` tests a callback that returns something different from (void, value, status, future<value>, future<>, result).  For each callback we check the same couple of things.
   
   Some of this may result in redundant coverage depending on the actual implementation but some of these tests that look identical are testing different code paths.  For example, callbacks that return futures are going to go through a different path than those that do not.  Callbacks that return void/status have a different path as well.
   
   Rather than try to pare the list down to the precise set of tests needed to cover all of the branches (something that is rather tricky) I think it is easier to have the entire set of possibilities in place.
   
   That being said, there were some copy/paste errors I fixed (many of the `FutureCompletionTest, Status` tests were incorrect) and I pared down the list of test cases a little bit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r546020284



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       StatusOnly has been removed for now since it isn't used outside the tests. When a consumer is interested in receiving only completion statuses we can revisit this method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545368422



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +283,636 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureTest, StressCallback) {
+  for (unsigned int n = 0; n < 1000; n++) {
+    auto fut = Future<>::Make();
+    std::atomic<unsigned int> count_finished_immediately(0);
+    std::atomic<unsigned int> count_finished_deferred(0);
+    std::atomic<unsigned int> callbacks_added(0);
+    std::atomic<bool> finished(false);
+
+    std::thread callback_adder([&] {
+      auto test_thread = std::this_thread::get_id();
+      while (!finished.load()) {
+        fut.AddCallback(
+            [&test_thread, &count_finished_immediately, &count_finished_deferred](
+                const Result<arrow::Future<arrow::detail::Empty>::ValueType>& result) {
+              if (std::this_thread::get_id() == test_thread) {
+                count_finished_immediately++;
+              } else {
+                count_finished_deferred++;
+              }
+            });
+        callbacks_added++;
+      }
+    });
+
+    while (callbacks_added.load() == 0) {
+      // Spin until the callback_adder has started running
+    }
+

Review comment:
       Assert that both `count_finished_immediately` and `count_finished_deferred` are 0 here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#issuecomment-748293175


   CI failures are unrelated. Will merge


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r544781160



##########
File path: cpp/src/arrow/util/functional.h
##########
@@ -79,5 +84,47 @@ struct call_traits {
       typename std::enable_if<std::is_same<return_type<F>, T>::value, RT>;
 };
 
+/// A type erased callable object which may only be invoked once.
+/// It can be constructed from any lambda which matches the provided call signature.
+/// Invoking it results in destruction of the lambda, freeing any state/references
+/// immediately. Invoking a default constructed FnOnce or one which has already been
+/// invoked will segfault.
+template <typename Signature>
+class FnOnce;
+
+template <typename R, typename... A>
+class FnOnce<R(A...)> {
+ public:
+  FnOnce() = default;
+
+  template <typename Fn,
+            typename = typename std::enable_if<std::is_convertible<
+                typename std::result_of<Fn && (A...)>::type, R>::value>::type>
+  FnOnce(Fn fn) : impl_(new FnImpl<Fn>(std::move(fn))) {  // NOLINT runtime/explicit
+  }
+
+  explicit operator bool() const { return impl_ != NULLPTR; }
+
+  R operator()(A... a) && {
+    auto bye = std::move(impl_);
+    return bye->invoke(static_cast<A&&>(a)...);

Review comment:
       This is mostly true.  It can't be perfect forwarding because we need to capture the values when the FnOnce is constructed (similar to the capture done by a lambda or std::function).  We also then need to move the values once more to make the actual call to the callback.
   
   However, the static_cast was causing one extra move to happen inside the FnOnce (from FnOnce to FnOnce::Impl) and that could be avoided with std::forward.  I modified FnOnceTest::MoveOnlyDataType to verify this change moved us from 3 moves to 2 moves.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545201217



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -229,6 +229,16 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
+  void AddCallback(Callback callback) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    if (IsFutureFinished(state_)) {
+      lock.unlock();
+      std::move(callback)();

Review comment:
       I've added a clarification in the method comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545362036



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -122,15 +125,19 @@ void AssertFinished(const Future<T>& fut) {
 // Assert the future is successful *now*
 template <typename T>
 void AssertSuccessful(const Future<T>& fut) {
-  ASSERT_EQ(fut.state(), FutureState::SUCCESS);
-  ASSERT_OK(fut.status());
+  if (IsFutureFinished(fut.state())) {

Review comment:
       This is changing the meaning of the tests, as now a pending future will pass this silently.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545368032



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +283,636 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureTest, StressCallback) {
+  for (unsigned int n = 0; n < 1000; n++) {
+    auto fut = Future<>::Make();
+    std::atomic<unsigned int> count_finished_immediately(0);
+    std::atomic<unsigned int> count_finished_deferred(0);
+    std::atomic<unsigned int> callbacks_added(0);
+    std::atomic<bool> finished(false);
+
+    std::thread callback_adder([&] {
+      auto test_thread = std::this_thread::get_id();
+      while (!finished.load()) {
+        fut.AddCallback(
+            [&test_thread, &count_finished_immediately, &count_finished_deferred](
+                const Result<arrow::Future<arrow::detail::Empty>::ValueType>& result) {
+              if (std::this_thread::get_id() == test_thread) {
+                count_finished_immediately++;
+              } else {
+                count_finished_deferred++;
+              }
+            });
+        callbacks_added++;
+      }
+    });
+
+    while (callbacks_added.load() == 0) {
+      // Spin until the callback_adder has started running
+    }
+
+    fut.MarkFinished();
+
+    while (count_finished_deferred.load() == 0) {

Review comment:
       Shouldn't you also wait on `count_finished_immediately` instead? The "deferred" callbacks should have been called from `MarkFinished` above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534357174



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon

Review comment:
       yes, I'll s/produced/returned




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534335689



##########
File path: cpp/src/arrow/util/functional.h
##########
@@ -79,5 +84,47 @@ struct call_traits {
       typename std::enable_if<std::is_same<return_type<F>, T>::value, RT>;
 };
 
+/// A type erased callable object which may only be invoked once.

Review comment:
       yes, with the added guarantee that when it is called it will drop all references which it has in closure




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534291430



##########
File path: cpp/src/arrow/util/functional.h
##########
@@ -79,5 +84,47 @@ struct call_traits {
       typename std::enable_if<std::is_same<return_type<F>, T>::value, RT>;
 };
 
+/// A type erased callable object which may only be invoked once.

Review comment:
       Is it a kind of lighter-weight `std::function`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545363890



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       After talking with Ben we are just going to drop StatusOnly until we have more compelling use cases.  In my mind it would be used for cleanup steps like you described (I'm not sure what you mean by out of order).
   
   For example...
   
   ```
   char * buffer = new char[1024*1024];
   auto fut = LoadBuffer(buffer).Then([buffer] { return ProcessBuffer(buffer); }).ThenFinally([buffer] {delete buffer;});
   ```
   
   So `ThenFinally` would be called after `LoadBuffer` and `ProcessBuffer` and it would be called regardless of whether they succeeded or failed and it would not affect the final result.  It would be equivalent to...
   
   ```
   .Then((const Result<ProcessedBuffer>& result) [buffer] { delete buffer; return result; }, (const Status& error_status) [buffer] { delete buffer; return error_status; })
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534354000



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -768,4 +1361,27 @@ TYPED_TEST(FutureIteratorTest, ErrorsAsCompleted) { this->TestErrorsAsCompleted(
 
 TYPED_TEST(FutureIteratorTest, StressAsCompleted) { this->TestStressAsCompleted(); }
 
+namespace internal {
+TEST(FnOnceTest, MoveOnlyDataType) {
+  // ensuring this is valid guarantees we are making no unnecessary copies

Review comment:
       Yes, I think it should replace `std::function` in `thread_pool.h` at least




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534358943



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());

Review comment:
       Hmm, ok. I thought calling `std::move` on the unused callback before calling `detail::Continue`, but that may not matter much (though it's always nice to release resources earlier).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545367277



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -122,15 +125,19 @@ void AssertFinished(const Future<T>& fut) {
 // Assert the future is successful *now*
 template <typename T>
 void AssertSuccessful(const Future<T>& fut) {
-  ASSERT_EQ(fut.state(), FutureState::SUCCESS);
-  ASSERT_OK(fut.status());
+  if (IsFutureFinished(fut.state())) {

Review comment:
       Good catch.  There should be an else block calling FAIL.  I'll add that.  The intent of this "if" wrapper is to prevent a pending future from deadlocking the test (which makes it difficult to debug).  It absolutely should fail if the future is pending.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545721129



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int i) { return std::to_string(i); },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 =
+        fut.Then(StatusOnly, [&status_seen](const Status& s) { status_seen = s; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, FutureStatus) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result, innerFut](const int& i) {
+      passed_in_result = i;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<>::Make();
+    auto fut2 = fut.Then([innerFut](const int& i) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](const int& i) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [&innerFut](const Status&) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, Result) {
+  {

Review comment:
       This is tested now in at least one place `FutureCompletionTest, Result` in the section `// Propagate failure by returning failure`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534295329



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -243,6 +253,12 @@ class ConcreteFutureImpl : public FutureImpl {
       }
     }
     cv_.notify_all();
+
+    // run callbacks
+    for (auto&& callback : callbacks_) {

Review comment:
       This isn't safe without the mutex held. But of course we don't want to call the callback while locked. The solution is probably to move the callbacks to another vector while locked.

##########
File path: cpp/src/arrow/util/task_group.h
##########
@@ -56,21 +67,13 @@ class ARROW_EXPORT TaskGroup : public std::enable_shared_from_this<TaskGroup> {
   /// The current aggregate error Status.  Non-blocking, useful for stopping early.
   virtual Status current_status() = 0;
 
-  /// Whether some tasks have already failed.  Non-blocking , useful for stopping early.
+  /// Whether some tasks have already failed.  Non-blocking, useful for stopping early.
   virtual bool ok() = 0;
 
   /// How many tasks can typically be executed in parallel.
   /// This is only a hint, useful for testing or debugging.
   virtual int parallelism() = 0;
 
-  /// Create a subgroup of this group.  This group can only finish
-  /// when all subgroups have finished (this means you must be
-  /// be careful to call Finish() on subgroups before calling it
-  /// on the main group).
-  // XXX if a subgroup errors out, should it propagate immediately to the parent
-  // and to children?
-  virtual std::shared_ptr<TaskGroup> MakeSubGroup() = 0;

Review comment:
       Thanks for removing this :-)

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {

Review comment:
       Hmm... what sense does it make to take a `Result<int>` if the callback is not executed on failure?
   (same comment for the above test snippet)

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int i) { return std::to_string(i); },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void

Review comment:
       This test seems to already be in `FutureCompletionTest::Void`

##########
File path: cpp/src/arrow/util/functional.h
##########
@@ -79,5 +84,47 @@ struct call_traits {
       typename std::enable_if<std::is_same<return_type<F>, T>::value, RT>;
 };
 
+/// A type erased callable object which may only be invoked once.
+/// It can be constructed from any lambda which matches the provided call signature.
+/// Invoking it results in destruction of the lambda, freeing any state/references
+/// immediately. Invoking a default constructed FnOnce or one which has already been
+/// invoked will segfault.
+template <typename Signature>
+class FnOnce;
+
+template <typename R, typename... A>
+class FnOnce<R(A...)> {
+ public:
+  FnOnce() = default;
+
+  template <typename Fn,
+            typename = typename std::enable_if<std::is_convertible<
+                typename std::result_of<Fn && (A...)>::type, R>::value>::type>
+  FnOnce(Fn fn) : impl_(new FnImpl<Fn>(std::move(fn))) {  // NOLINT runtime/explicit
+  }
+
+  explicit operator bool() const { return impl_ != NULLPTR; }
+
+  R operator()(A... a) && {
+    auto bye = std::move(impl_);
+    return bye->invoke(static_cast<A&&>(a)...);

Review comment:
       Why not forward the arguments here?

##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -229,6 +229,16 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
+  void AddCallback(Callback callback) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    if (IsFutureFinished(state_)) {
+      lock.unlock();
+      std::move(callback)();

Review comment:
       This callback could be run before the other ones, if `DoMarkFinishedOrFailed` is still running. Is that a concern?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon

Review comment:
       "produced" means "returned"?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());

Review comment:
       You should move both callbacks in each branch, shouldn't you?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -19,63 +19,150 @@
 
 #include <atomic>
 #include <cmath>
+#include <functional>
 #include <memory>
 #include <type_traits>
 #include <utility>
 #include <vector>
 
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/util/functional.h"
 #include "arrow/util/macros.h"
+#include "arrow/util/type_fwd.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
+namespace detail {
+
+struct Empty {
+  static Result<Empty> ToResult(Status s) {
+    if (ARROW_PREDICT_TRUE(s.ok())) {
+      return Empty{};
+    }
+    return s;
+  }
+};
+
+template <typename>
+struct is_future : std::false_type {};
+
+template <typename T>
+struct is_future<Future<T>> : std::true_type {};
+
+template <typename Signature>
+using result_of_t = typename std::result_of<Signature>::type;
+
+constexpr struct ContinueFuture {
+  template <typename Return>
+  struct ForReturnImpl;
+
+  template <typename Return>
+  using ForReturn = typename ForReturnImpl<Return>::type;
+
+  template <typename Signature>
+  using ForSignature = ForReturn<result_of_t<Signature>>;
+
+  template <typename F, typename... A, typename R = result_of_t<F && (A && ...)>,

Review comment:
       Can you use less terse argument names? I keep thinking `F` means "future" here.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes

Review comment:
       Explain which arguments the callback should expect.

##########
File path: cpp/src/arrow/util/task_group.h
##########
@@ -22,22 +22,33 @@
 #include <utility>
 
 #include "arrow/status.h"
+#include "arrow/util/future.h"

Review comment:
       This doesn't look required?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       I don't understand what this one does. Can you add a docstring or comment?

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -91,14 +81,17 @@ class ThreadedTaskGroup : public TaskGroup {
       nremaining_.fetch_add(1, std::memory_order_acquire);
 
       auto self = checked_pointer_cast<ThreadedTaskGroup>(shared_from_this());
-      Status st = executor_->Spawn([self, task]() {
-        if (self->ok_.load(std::memory_order_acquire)) {
-          // XXX what about exceptions?
-          Status st = task();
-          self->UpdateStatus(std::move(st));
-        }
-        self->OneTaskDone();
-      });
+      Status st = executor_->Spawn(std::bind(
+          [](const std::shared_ptr<ThreadedTaskGroup>& self,
+             const std::function<Status()>& task) {
+            if (self->ok_.load(std::memory_order_acquire)) {
+              // XXX what about exceptions?
+              Status st = task();
+              self->UpdateStatus(std::move(st));
+            }
+            self->OneTaskDone();
+          },
+          std::move(self), std::move(task)));

Review comment:
       C++14 would make this easier...

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -116,25 +109,15 @@ class ThreadedTaskGroup : public TaskGroup {
       cv_.wait(lock, [&]() { return nremaining_.load() == 0; });
       // Current tasks may start other tasks, so only set this when done
       finished_ = true;
-      if (parent_) {
-        parent_->OneTaskDone();
-      }
+      completion_future_.MarkFinished(status_);
     }
     return status_;
   }
 
   int parallelism() override { return executor_->GetCapacity(); }
 
-  std::shared_ptr<TaskGroup> MakeSubGroup() override {
-    std::lock_guard<std::mutex> lock(mutex_);
-    auto child = new ThreadedTaskGroup(executor_);
-    child->parent_ = this;
-    nremaining_.fetch_add(1, std::memory_order_acquire);
-    return std::shared_ptr<TaskGroup>(child);
-  }
-
  protected:
-  void UpdateStatus(Status&& st) {
+  void UpdateStatus(const Status& st) {

Review comment:
       Hmm, `st` is moved below.

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -159,13 +142,13 @@ class ThreadedTaskGroup : public TaskGroup {
   Executor* executor_;
   std::atomic<int32_t> nremaining_;
   std::atomic<bool> ok_;
+  Future<> completion_future_ = Future<>::Make();

Review comment:
       This isn't used yet? Or does its presence have side effects?

##########
File path: cpp/src/arrow/util/thread_pool_benchmark.cc
##########
@@ -103,8 +103,40 @@ static void ThreadPoolSpawn(benchmark::State& state) {
   state.SetItemsProcessed(state.iterations() * nspawns);
 }
 
+// Benchmark ThreadPool::Submit
+static void ThreadPoolSubmit(benchmark::State& state) {  // NOLINT non-const reference
+  const auto nthreads = static_cast<int>(state.range(0));
+  const auto workload_size = static_cast<int32_t>(state.range(1));
+
+  Workload workload(workload_size);
+
+  const int32_t nspawns = 10000000 / workload_size + 1;
+
+  for (auto _ : state) {
+    state.PauseTiming();
+    auto pool = *ThreadPool::Make(nthreads);
+    std::atomic<int32_t> n_finished{0};
+    state.ResumeTiming();
+
+    for (int32_t i = 0; i < nspawns; ++i) {
+      // Pass the task by reference to avoid copying it around
+      (void)DeferNotOk(pool->Submit(std::ref(workload))).Then([&](...) {

Review comment:
       This is benchmarking much more than `Submit`, because it is also stressing the callback and continuation management. I would rather have separate `Future` micro-benchmarks (for construction, simple use cases, callbacks, continuations...).

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });

Review comment:
       Out of curiosity, what happens if one of these callbacks captures `fut`?

##########
File path: cpp/src/arrow/util/thread_pool_benchmark.cc
##########
@@ -136,21 +168,24 @@ static void ThreadedTaskGroup(benchmark::State& state) {
 
   for (auto _ : state) {
     auto task_group = TaskGroup::MakeThreaded(pool.get());
-    for (int32_t i = 0; i < nspawns; ++i) {
-      // Pass the task by reference to avoid copying it around
-      task_group->Append(std::ref(task));
-    }
+    task_group->Append([&task, nspawns, task_group] {

Review comment:
       Why the nested append?

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -122,15 +122,21 @@ void AssertFinished(const Future<T>& fut) {
 // Assert the future is successful *now*
 template <typename T>
 void AssertSuccessful(const Future<T>& fut) {
-  ASSERT_EQ(fut.state(), FutureState::SUCCESS);
-  ASSERT_OK(fut.status());
+  ASSERT_TRUE(fut.Wait(0.1));

Review comment:
       Hmm... this doesn't match the comment above anymore.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -768,4 +1361,27 @@ TYPED_TEST(FutureIteratorTest, ErrorsAsCompleted) { this->TestErrorsAsCompleted(
 
 TYPED_TEST(FutureIteratorTest, StressAsCompleted) { this->TestStressAsCompleted(); }
 
+namespace internal {
+TEST(FnOnceTest, MoveOnlyDataType) {
+  // ensuring this is valid guarantees we are making no unnecessary copies

Review comment:
       Nice. Do we want to use `FnOnce` in other places at some point? (perhaps open a JIRA?)

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure

Review comment:
       Did you copy/paste this? This seems to be the same test snippet as a couple dozens lines above.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int i) { return std::to_string(i); },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status

Review comment:
       Ditto? (not sure)

##########
File path: cpp/src/arrow/util/thread_pool_benchmark.cc
##########
@@ -103,8 +103,40 @@ static void ThreadPoolSpawn(benchmark::State& state) {
   state.SetItemsProcessed(state.iterations() * nspawns);
 }
 
+// Benchmark ThreadPool::Submit
+static void ThreadPoolSubmit(benchmark::State& state) {  // NOLINT non-const reference
+  const auto nthreads = static_cast<int>(state.range(0));
+  const auto workload_size = static_cast<int32_t>(state.range(1));
+
+  Workload workload(workload_size);
+
+  const int32_t nspawns = 10000000 / workload_size + 1;
+
+  for (auto _ : state) {
+    state.PauseTiming();
+    auto pool = *ThreadPool::Make(nthreads);
+    std::atomic<int32_t> n_finished{0};
+    state.ResumeTiming();
+
+    for (int32_t i = 0; i < nspawns; ++i) {
+      // Pass the task by reference to avoid copying it around
+      (void)DeferNotOk(pool->Submit(std::ref(workload))).Then([&](...) {

Review comment:
       (the `Future` micro-benchmarks can be deferred to another PR, though)

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       Hmm, looking at the tests, I think I understand. The `StatusOnly` idiom doesn't look terrific. How about instead declaring this as `ContinuedFuture ThenBoth(OnComplete&& on_complete)`, where `OnComplete` would take either a `Result<T>` or a `Status`, depending on the future type?

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int i) { return std::to_string(i); },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 =
+        fut.Then(StatusOnly, [&status_seen](const Status& s) { status_seen = s; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, FutureStatus) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result, innerFut](const int& i) {
+      passed_in_result = i;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<>::Make();
+    auto fut2 = fut.Then([innerFut](const int& i) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](const int& i) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [&innerFut](const Status&) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished(Status::OK());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, Result) {
+  {

Review comment:
       Do we want to test cases where a failed `Result` is returned? Though it should work, since `MarkFinished` accepts it.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure

Review comment:
       Ditto here? (though I'm not sure)

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -122,15 +122,21 @@ void AssertFinished(const Future<T>& fut) {
 // Assert the future is successful *now*
 template <typename T>
 void AssertSuccessful(const Future<T>& fut) {
-  ASSERT_EQ(fut.state(), FutureState::SUCCESS);
-  ASSERT_OK(fut.status());
+  ASSERT_TRUE(fut.Wait(0.1));

Review comment:
       Perhaps instead add separate `AssertSuccessfulSoon` and `AssertFailedSoon`?

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int) { return 99; },
+                         [&was_io_error](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return 100;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 100);
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then([](...) { return 42; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen](const Status& s) {
+      status_seen = s;
+      return 42;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42);
+  }
+}
+
+TEST(FutureCompletionTest, FutureNonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    int passed_in_result = 0;
+    auto fut2 = fut.Then([&passed_in_result, innerFut](const Result<int>& result) {
+      passed_in_result = *result;
+      return innerFut;
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([innerFut](int) { return innerFut; },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished();
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([&innerFut](...) { return innerFut; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    auto innerFut = Future<std::string>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly, [&status_seen, &innerFut](const Status& s) {
+      status_seen = s;
+      return innerFut;
+    });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+  }
+}
+
+TEST(FutureCompletionTest, Status) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    Future<> fut2 = fut.Then([&passed_in_result](const Result<int>& result) {
+      passed_in_result = *result;
+      return Status::OK();
+    });
+    fut.MarkFinished(42);
+    ASSERT_EQ(passed_in_result, 42);
+    AssertSuccessful(fut2);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    auto fut2 = fut.Then([innerFut](const Result<int>& result) { return innerFut; });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto innerFut = Future<std::string>::Make();
+    bool was_io_error = false;
+    auto fut2 = fut.Then([](int i) { return std::to_string(i); },
+                         [&was_io_error, innerFut](const Status& s) {
+                           was_io_error = s.IsIOError();
+                           return innerFut;
+                         });
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertNotFinished(fut2);
+    innerFut.MarkFinished("hello");
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, "hello");
+    ASSERT_TRUE(was_io_error);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status

Review comment:
       DItto?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545201781



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545213923



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       I agree with Antoine although I don't know if ThenBoth is much clearer.  The main concern I have is that marking something "StatusOnly" can cause the error state of the chain to be cleared.  For example, in RX, there are three types of callbacks.
   
   on_success - continues the chain with success
   on_failure - intercepts a failure in the chain and (attempts to) convert it to a success
   on_completed - intercepts success or failure but does not convert to success
   
   on_failure is equivalent to try/catch while on_completed is equivalent to try/finally.
   
   My concern here is that StatusOnly makes me think of on_completed and so it can't clear the error state of the future.  However, looking at the tests, it appears that it does actually clear the error state.
   
   Could we change the name to ThenFinally or WhenCompleted and then change the behavior so that a "StatusOnly" callback can't clear the error state?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545201645



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -19,63 +19,150 @@
 
 #include <atomic>
 #include <cmath>
+#include <functional>
 #include <memory>
 #include <type_traits>
 #include <utility>
 #include <vector>
 
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/util/functional.h"
 #include "arrow/util/macros.h"
+#include "arrow/util/type_fwd.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
+namespace detail {
+
+struct Empty {
+  static Result<Empty> ToResult(Status s) {
+    if (ARROW_PREDICT_TRUE(s.ok())) {
+      return Empty{};
+    }
+    return s;
+  }
+};
+
+template <typename>
+struct is_future : std::false_type {};
+
+template <typename T>
+struct is_future<Future<T>> : std::true_type {};
+
+template <typename Signature>
+using result_of_t = typename std::result_of<Signature>::type;
+
+constexpr struct ContinueFuture {
+  template <typename Return>
+  struct ForReturnImpl;
+
+  template <typename Return>
+  using ForReturn = typename ForReturnImpl<Return>::type;
+
+  template <typename Signature>
+  using ForSignature = ForReturn<result_of_t<Signature>>;
+
+  template <typename F, typename... A, typename R = result_of_t<F && (A && ...)>,

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534352213



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -19,63 +19,150 @@
 
 #include <atomic>
 #include <cmath>
+#include <functional>
 #include <memory>
 #include <type_traits>
 #include <utility>
 #include <vector>
 
 #include "arrow/result.h"
 #include "arrow/status.h"
+#include "arrow/util/functional.h"
 #include "arrow/util/macros.h"
+#include "arrow/util/type_fwd.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
 
+namespace detail {
+
+struct Empty {
+  static Result<Empty> ToResult(Status s) {
+    if (ARROW_PREDICT_TRUE(s.ok())) {
+      return Empty{};
+    }
+    return s;
+  }
+};
+
+template <typename>
+struct is_future : std::false_type {};
+
+template <typename T>
+struct is_future<Future<T>> : std::true_type {};
+
+template <typename Signature>
+using result_of_t = typename std::result_of<Signature>::type;
+
+constexpr struct ContinueFuture {
+  template <typename Return>
+  struct ForReturnImpl;
+
+  template <typename Return>
+  using ForReturn = typename ForReturnImpl<Return>::type;
+
+  template <typename Signature>
+  using ForSignature = ForReturn<result_of_t<Signature>>;
+
+  template <typename F, typename... A, typename R = result_of_t<F && (A && ...)>,

Review comment:
       will do




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545362938



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });

Review comment:
       A comment would be good IMHO.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#issuecomment-728205092


   https://issues.apache.org/jira/browse/ARROW-10182


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534358168



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -243,6 +253,12 @@ class ConcreteFutureImpl : public FutureImpl {
       }
     }
     cv_.notify_all();
+
+    // run callbacks
+    for (auto&& callback : callbacks_) {

Review comment:
       Because you're accessing an object that's mutated (potentially) from another thread in `AddCallback`.
   Ideally there should be a multi-threaded stress test for adding callbacks to a future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534350582



##########
File path: cpp/src/arrow/util/functional.h
##########
@@ -79,5 +84,47 @@ struct call_traits {
       typename std::enable_if<std::is_same<return_type<F>, T>::value, RT>;
 };
 
+/// A type erased callable object which may only be invoked once.
+/// It can be constructed from any lambda which matches the provided call signature.
+/// Invoking it results in destruction of the lambda, freeing any state/references
+/// immediately. Invoking a default constructed FnOnce or one which has already been
+/// invoked will segfault.
+template <typename Signature>
+class FnOnce;
+
+template <typename R, typename... A>
+class FnOnce<R(A...)> {
+ public:
+  FnOnce() = default;
+
+  template <typename Fn,
+            typename = typename std::enable_if<std::is_convertible<
+                typename std::result_of<Fn && (A...)>::type, R>::value>::type>
+  FnOnce(Fn fn) : impl_(new FnImpl<Fn>(std::move(fn))) {  // NOLINT runtime/explicit
+  }
+
+  explicit operator bool() const { return impl_ != NULLPTR; }
+
+  R operator()(A... a) && {
+    auto bye = std::move(impl_);
+    return bye->invoke(static_cast<A&&>(a)...);

Review comment:
       Since the types of the arguments are already fixed at class scope (so this is not a function template), perfect forwarding is not *really* available here.
   
   `A` might be:
   - `std::string`, in which case `static_cast<A&&>` becomes `static_cast<std::string&&>` (appropriate move)
   - `const std::string&`, in which case `static_cast<A&&>` reference-decays to `static_cast<const std::string&>` (no inappropriate move)
   - `std::string&&`, in which case `static_cast<A&&>` becomes `static_cast<std::string&&>` (appropriate move)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545167684



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });

Review comment:
       Future has released its locks by the time it calls into the callbacks so it should be safe to access and won't cause deadlock.  It's always going to be redundant though since you should know the state of the future and have access to the result from the callback arguments.
   
   However, you could cause a memory leak.  If a future is abandoned then the callback will hold onto the future impl and the future impl holds onto the callback.
   
   I'm not sure what you can test here other than to verify it is a bad idea.  Do we need a comment warning users to not do this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545377670



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +283,636 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureTest, StressCallback) {
+  for (unsigned int n = 0; n < 1000; n++) {
+    auto fut = Future<>::Make();
+    std::atomic<unsigned int> count_finished_immediately(0);
+    std::atomic<unsigned int> count_finished_deferred(0);
+    std::atomic<unsigned int> callbacks_added(0);
+    std::atomic<bool> finished(false);
+
+    std::thread callback_adder([&] {
+      auto test_thread = std::this_thread::get_id();
+      while (!finished.load()) {
+        fut.AddCallback(
+            [&test_thread, &count_finished_immediately, &count_finished_deferred](
+                const Result<arrow::Future<arrow::detail::Empty>::ValueType>& result) {

Review comment:
       Ben showed me I can use `Result<detail::Empty>` instead.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +283,636 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureTest, StressCallback) {
+  for (unsigned int n = 0; n < 1000; n++) {
+    auto fut = Future<>::Make();
+    std::atomic<unsigned int> count_finished_immediately(0);
+    std::atomic<unsigned int> count_finished_deferred(0);
+    std::atomic<unsigned int> callbacks_added(0);
+    std::atomic<bool> finished(false);
+
+    std::thread callback_adder([&] {
+      auto test_thread = std::this_thread::get_id();
+      while (!finished.load()) {
+        fut.AddCallback(
+            [&test_thread, &count_finished_immediately, &count_finished_deferred](
+                const Result<arrow::Future<arrow::detail::Empty>::ValueType>& result) {
+              if (std::this_thread::get_id() == test_thread) {
+                count_finished_immediately++;
+              } else {
+                count_finished_deferred++;
+              }
+            });
+        callbacks_added++;
+      }
+    });
+
+    while (callbacks_added.load() == 0) {
+      // Spin until the callback_adder has started running
+    }
+

Review comment:
       I'll add the check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534360436



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureCompletionTest, Void) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    int passed_in_result = 0;
+    auto fut2 =
+        fut.Then([&passed_in_result](const int& result) { passed_in_result = result; });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    ASSERT_EQ(passed_in_result, 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const int& result) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertFailed(fut2);
+    ASSERT_TRUE(fut2.status().IsIOError());
+  }
+  {
+    // Swallow failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished(Result<int>(Status::IOError("xxx")));
+    AssertSuccessful(fut2);
+  }
+  {
+    // From void
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) {});
+    fut.MarkFinished();
+    AssertSuccessful(fut2);
+  }
+  {
+    // From failed status
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& s) { return s; });
+    fut.MarkFinished(Status::IOError("xxx"));
+    AssertFailed(fut2);
+  }
+  {
+    // Recover a failed status
+    auto fut = Future<>::Make();
+    Status status_seen = Status::OK();
+    auto fut2 = fut.Then(StatusOnly,
+                         [&status_seen](const Status& result) { status_seen = result; });
+    ASSERT_TRUE(status_seen.ok());
+    fut.MarkFinished(Status::IOError("xxx"));
+    ASSERT_TRUE(status_seen.IsIOError());
+    AssertSuccessful(fut2);
+  }
+}
+
+TEST(FutureCompletionTest, NonVoid) {
+  {
+    // Simple callback
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {
+      auto passed_in_result = *result;
+      return passed_in_result * passed_in_result;
+    });
+    fut.MarkFinished(42);
+    AssertSuccessful(fut2);
+    auto result = *fut2.result();
+    ASSERT_EQ(result, 42 * 42);
+  }
+  {
+    // Propagate failure
+    auto fut = Future<int>::Make();
+    auto fut2 = fut.Then([](const Result<int>& result) {

Review comment:
       That's a hold over from when only a single callback was provided. I'll refactor to `int`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r546027100



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -122,15 +122,21 @@ void AssertFinished(const Future<T>& fut) {
 // Assert the future is successful *now*
 template <typename T>
 void AssertSuccessful(const Future<T>& fut) {
-  ASSERT_EQ(fut.state(), FutureState::SUCCESS);
-  ASSERT_OK(fut.status());
+  ASSERT_TRUE(fut.Wait(0.1));

Review comment:
       Hmm, I think this is a bit mixed up. `Future::state` is not blocking, `Future::status` is. But since we're first asserting that the future state is completed by calling `Future::state` and asserting on the result, `AssertSuccessful` could not succeed if the future is pending.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534358014



##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -159,13 +142,13 @@ class ThreadedTaskGroup : public TaskGroup {
   Executor* executor_;
   std::atomic<int32_t> nremaining_;
   std::atomic<bool> ok_;
+  Future<> completion_future_ = Future<>::Make();

Review comment:
       nope, I'll clean this up




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r546107977



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -768,4 +1361,27 @@ TYPED_TEST(FutureIteratorTest, ErrorsAsCompleted) { this->TestErrorsAsCompleted(
 
 TYPED_TEST(FutureIteratorTest, StressAsCompleted) { this->TestStressAsCompleted(); }
 
+namespace internal {
+TEST(FnOnceTest, MoveOnlyDataType) {
+  // ensuring this is valid guarantees we are making no unnecessary copies

Review comment:
       https://issues.apache.org/jira/browse/ARROW-10966




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534351495



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -229,6 +229,16 @@ class ConcreteFutureImpl : public FutureImpl {
 
   void DoMarkFailed() { DoMarkFinishedOrFailed(FutureState::FAILURE); }
 
+  void AddCallback(Callback callback) {
+    std::unique_lock<std::mutex> lock(mutex_);
+    if (IsFutureFinished(state_)) {
+      lock.unlock();
+      std::move(callback)();

Review comment:
       I don't think so; we don't provide guarantees on callback ordering. I'll add a comment to that effect




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534360066



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +282,593 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });

Review comment:
       I'll add a case for that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545368951



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +283,636 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureTest, StressCallback) {
+  for (unsigned int n = 0; n < 1000; n++) {
+    auto fut = Future<>::Make();
+    std::atomic<unsigned int> count_finished_immediately(0);
+    std::atomic<unsigned int> count_finished_deferred(0);
+    std::atomic<unsigned int> callbacks_added(0);
+    std::atomic<bool> finished(false);
+
+    std::thread callback_adder([&] {
+      auto test_thread = std::this_thread::get_id();
+      while (!finished.load()) {
+        fut.AddCallback(
+            [&test_thread, &count_finished_immediately, &count_finished_deferred](
+                const Result<arrow::Future<arrow::detail::Empty>::ValueType>& result) {

Review comment:
       We don't want to type `const Result<arrow::Future<arrow::detail::Empty>::ValueType>&` every time, do we? ;-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545718970



##########
File path: cpp/src/arrow/util/thread_pool_benchmark.cc
##########
@@ -136,21 +168,24 @@ static void ThreadedTaskGroup(benchmark::State& state) {
 
   for (auto _ : state) {
     auto task_group = TaskGroup::MakeThreaded(pool.get());
-    for (int32_t i = 0; i < nspawns; ++i) {
-      // Pass the task by reference to avoid copying it around
-      task_group->Append(std::ref(task));
-    }
+    task_group->Append([&task, nspawns, task_group] {

Review comment:
       I spoke with @bkietz a bit more on this and he pointed out it is not a race condition as written without the futures.  The threaded task group cannot actually finish unless `Finish` is called.  Once futures are added in then it becomes an issue.  As soon as the task counter hits 0 it will complete the future.
   
   I spoke about whether we should pull this nesting back out and it was recommended to just leave it all alone since futures will replace task group someday.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534351861



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());

Review comment:
       not necessary; since this is in a FnOnce all members will be dropped as soon as it's invoked




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz closed pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz closed pull request #8680:
URL: https://github.com/apache/arrow/pull/8680


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545719066



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -122,15 +122,21 @@ void AssertFinished(const Future<T>& fut) {
 // Assert the future is successful *now*
 template <typename T>
 void AssertSuccessful(const Future<T>& fut) {
-  ASSERT_EQ(fut.state(), FutureState::SUCCESS);
-  ASSERT_OK(fut.status());
+  ASSERT_TRUE(fut.Wait(0.1));

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#issuecomment-728333688


   Added a benchmark to measure the cost of creating a Future in Executor::Submit instead of just using Executor::Spawn. Locally:
   ```
   ---------------------------------------------------------------------------------------------
   Benchmark                                                      Time           CPU Iterations
   ---------------------------------------------------------------------------------------------
   ThreadPoolSpawn/threads:4/task_cost:1000/real_time     201939036 ns  173389187 ns          6    967.19k items/s
   ThreadPoolSpawn/threads:8/task_cost:1000/real_time     294998893 ns  264699658 ns          2   662.082k items/s
   ThreadPoolSpawn/threads:4/task_cost:10000/real_time     36495616 ns    2546067 ns         19   535.194k items/s
   ThreadPoolSpawn/threads:8/task_cost:10000/real_time     20151885 ns    3711374 ns         30   969.251k items/s
   ThreadPoolSpawn/threads:4/task_cost:100000/real_time    36728060 ns     285424 ns         19   53.2046k items/s
   ThreadPoolSpawn/threads:8/task_cost:100000/real_time    23104241 ns     415593 ns         29   84.5776k items/s
   ThreadPoolSubmit/threads:4/task_cost:1000/real_time     20986410 ns   19332114 ns         33   465.377k items/s
   ThreadPoolSubmit/threads:8/task_cost:1000/real_time     19378929 ns   18328342 ns         36    503.98k items/s
   ThreadPoolSubmit/threads:4/task_cost:10000/real_time     2320937 ns     761124 ns        303   421.183k items/s
   ThreadPoolSubmit/threads:8/task_cost:10000/real_time     1727973 ns    1442820 ns        393   565.714k items/s
   ThreadPoolSubmit/threads:4/task_cost:100000/real_time    2085426 ns     112413 ns        329   47.2962k items/s
   ThreadPoolSubmit/threads:8/task_cost:100000/real_time    1279387 ns     104251 ns        514   77.0938k items/s
   ```
   This shows there is significant overhead for small tasks and pools with fewer threads. Follow up: https://issues.apache.org/jira/browse/ARROW-10625


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534359769



##########
File path: cpp/src/arrow/util/thread_pool_benchmark.cc
##########
@@ -103,8 +103,40 @@ static void ThreadPoolSpawn(benchmark::State& state) {
   state.SetItemsProcessed(state.iterations() * nspawns);
 }
 
+// Benchmark ThreadPool::Submit
+static void ThreadPoolSubmit(benchmark::State& state) {  // NOLINT non-const reference
+  const auto nthreads = static_cast<int>(state.range(0));
+  const auto workload_size = static_cast<int32_t>(state.range(1));
+
+  Workload workload(workload_size);
+
+  const int32_t nspawns = 10000000 / workload_size + 1;
+
+  for (auto _ : state) {
+    state.PauseTiming();
+    auto pool = *ThreadPool::Make(nthreads);
+    std::atomic<int32_t> n_finished{0};
+    state.ResumeTiming();
+
+    for (int32_t i = 0; i < nspawns; ++i) {
+      // Pass the task by reference to avoid copying it around
+      (void)DeferNotOk(pool->Submit(std::ref(workload))).Then([&](...) {

Review comment:
       I agree it warrants improvement and individual benchmarking, but my intent was to measure the end-to-end cost of using Submit+callbacks since that's relevant to the csv parsing case

##########
File path: cpp/src/arrow/util/thread_pool_benchmark.cc
##########
@@ -136,21 +168,24 @@ static void ThreadedTaskGroup(benchmark::State& state) {
 
   for (auto _ : state) {
     auto task_group = TaskGroup::MakeThreaded(pool.get());
-    for (int32_t i = 0; i < nspawns; ++i) {
-      // Pass the task by reference to avoid copying it around
-      task_group->Append(std::ref(task));
-    }
+    task_group->Append([&task, nspawns, task_group] {

Review comment:
       hold over, will clean




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534352606



##########
File path: cpp/src/arrow/util/task_group.h
##########
@@ -22,22 +22,33 @@
 #include <utility>
 
 #include "arrow/status.h"
+#include "arrow/util/future.h"

Review comment:
       will remove

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -91,14 +81,17 @@ class ThreadedTaskGroup : public TaskGroup {
       nremaining_.fetch_add(1, std::memory_order_acquire);
 
       auto self = checked_pointer_cast<ThreadedTaskGroup>(shared_from_this());
-      Status st = executor_->Spawn([self, task]() {
-        if (self->ok_.load(std::memory_order_acquire)) {
-          // XXX what about exceptions?
-          Status st = task();
-          self->UpdateStatus(std::move(st));
-        }
-        self->OneTaskDone();
-      });
+      Status st = executor_->Spawn(std::bind(
+          [](const std::shared_ptr<ThreadedTaskGroup>& self,
+             const std::function<Status()>& task) {
+            if (self->ok_.load(std::memory_order_acquire)) {
+              // XXX what about exceptions?
+              Status st = task();
+              self->UpdateStatus(std::move(st));
+            }
+            self->OneTaskDone();
+          },
+          std::move(self), std::move(task)));

Review comment:
       indeed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545214673



##########
File path: cpp/src/arrow/util/task_group.h
##########
@@ -22,22 +22,33 @@
 #include <utility>
 
 #include "arrow/status.h"
+#include "arrow/util/future.h"

Review comment:
       Removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r534356842



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -243,6 +253,12 @@ class ConcreteFutureImpl : public FutureImpl {
       }
     }
     cv_.notify_all();
+
+    // run callbacks
+    for (auto&& callback : callbacks_) {

Review comment:
       I'm not sure why this isn't safe. During iteration the vector won't be dropped (since we have at least the reference held by the thread calling `MarkFinished`) or mutated (since attempting to add a callback to a finished future will immediately evaluate that callback)

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes

Review comment:
       will do




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r544781710



##########
File path: cpp/src/arrow/util/future.cc
##########
@@ -243,6 +253,12 @@ class ConcreteFutureImpl : public FutureImpl {
       }
     }
     cv_.notify_all();
+
+    // run callbacks
+    for (auto&& callback : callbacks_) {

Review comment:
       I agree with Ben.  The vector cannot be mutated because we know by the time we reach the iteration that the future has (safely) been marked finished.  Any `AddCallback` from another thread will not modify the callbacks vector.  I added FutureTest::StressCallback to confirm.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545733104



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       Actually, just for completeness sake, the equivalent call to...
   ```
   .ThenFinally([buffer] { delete buffer; })
   ```
   ...would be...
   ```
   .Then(
     (const Result<ProcessedBuffer>& result) [buffer] { delete buffer; return result; },
     (const Status& error_status) [buffer] { delete buffer; return Result<ProcessedBuffer>(error_status); }
   )
   ```
   The key difference being that the on_failure callback also has to return `Result<ProcessedBuffer>` and can't just return `Status`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r546020261



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -122,15 +122,21 @@ void AssertFinished(const Future<T>& fut) {
 // Assert the future is successful *now*
 template <typename T>
 void AssertSuccessful(const Future<T>& fut) {
-  ASSERT_EQ(fut.state(), FutureState::SUCCESS);
-  ASSERT_OK(fut.status());
+  ASSERT_TRUE(fut.Wait(0.1));

Review comment:
       I should clarify, I took out the `Wait(0.1)` since the check beforehand that it was completed was good enough.  So now if it is in the pending state it will fail.  This change was intended to make debugging easier.  `AssertSuccessful` calls `fut.state()` which (if the future is pending) will wait on the pending future.  This means a bug in the test or code which resulted in a pending future being passed to `AssertSuccessful` would deadlock and this was making the tests difficult to debug.
   
   Now we simply check for pending first as a special case and fail if it is pending before we check the state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545359826



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -273,11 +349,134 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  ///
+  /// The callback should receive the result of the future (const Result<T>&)
+  /// For a void or statusy future this should be
+  /// (const Result<Future<detail::Empty>::ValueType>& result)

Review comment:
       ... which is? Ideally, the user gets something "simple", such as `Status` or `const Status&`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r546002566



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -273,11 +349,134 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  ///
+  /// The callback should receive the result of the future (const Result<T>&)
+  /// For a void or statusy future this should be
+  /// (const Result<Future<detail::Empty>::ValueType>& result)

Review comment:
       I think it's preferable to keep things as generic as possible, so the empty result is preferable to the plain Status




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545378494



##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -276,6 +283,636 @@ TEST(FutureSyncTest, Int) {
   }
 }
 
+TEST(FutureRefTest, ChainRemoved) {
+  // Creating a future chain should not prevent the futures from being deleted if the
+  // entire chain is deleted
+  std::weak_ptr<FutureImpl> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status& status) { return Status::OK(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+
+  {
+    auto fut = Future<>::Make();
+    auto fut2 = fut.Then(StatusOnly, [](const Status&) { return Future<>::Make(); });
+    ref = fut.impl_;
+    ref2 = fut2.impl_;
+  }
+  ASSERT_TRUE(ref.expired());
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, TailRemoved) {
+  // Keeping the head of the future chain should keep the entire chain alive
+  std::shared_ptr<Future<>> ref;
+  std::weak_ptr<FutureImpl> ref2;
+  bool side_effect_run = false;
+  {
+    ref = std::make_shared<Future<>>(Future<>::Make());
+    auto fut2 = ref->Then(StatusOnly, [&side_effect_run](const Status& status) {
+      side_effect_run = true;
+      return Status::OK();
+    });
+    ref2 = fut2.impl_;
+  }
+  ASSERT_FALSE(ref2.expired());
+
+  ref->MarkFinished();
+  ASSERT_TRUE(side_effect_run);
+  ASSERT_TRUE(ref2.expired());
+}
+
+TEST(FutureRefTest, HeadRemoved) {
+  // Keeping the tail of the future chain should not keep the entire chain alive.  If no
+  // one has a reference to the head then there is no need to keep it, nothing will finish
+  // it.  In theory the intermediate futures could be finished by some external process
+  // but that would be highly unusual and bad practice so in reality this would just be a
+  // reference to a future that will never complete which is ok.
+  std::weak_ptr<FutureImpl> ref;
+  std::shared_ptr<Future<>> ref2;
+  {
+    auto fut = std::make_shared<Future<>>(Future<>::Make());
+    ref = fut->impl_;
+    ref2 = std::make_shared<Future<>>(fut->Then([](...) {}));
+  }
+  ASSERT_TRUE(ref.expired());
+
+  {
+    auto fut = Future<>::Make();
+    ref2 = std::make_shared<Future<>>(fut.Then([&](...) {
+      auto intermediate = Future<>::Make();
+      ref = intermediate.impl_;
+      return intermediate;
+    }));
+    fut.MarkFinished();
+  }
+  ASSERT_TRUE(ref.expired());
+}
+
+TEST(FutureTest, StressCallback) {
+  for (unsigned int n = 0; n < 1000; n++) {
+    auto fut = Future<>::Make();
+    std::atomic<unsigned int> count_finished_immediately(0);
+    std::atomic<unsigned int> count_finished_deferred(0);
+    std::atomic<unsigned int> callbacks_added(0);
+    std::atomic<bool> finished(false);
+
+    std::thread callback_adder([&] {
+      auto test_thread = std::this_thread::get_id();
+      while (!finished.load()) {
+        fut.AddCallback(
+            [&test_thread, &count_finished_immediately, &count_finished_deferred](
+                const Result<arrow::Future<arrow::detail::Empty>::ValueType>& result) {
+              if (std::this_thread::get_id() == test_thread) {
+                count_finished_immediately++;
+              } else {
+                count_finished_deferred++;
+              }
+            });
+        callbacks_added++;
+      }
+    });
+
+    while (callbacks_added.load() == 0) {
+      // Spin until the callback_adder has started running
+    }
+
+    fut.MarkFinished();
+
+    while (count_finished_deferred.load() == 0) {

Review comment:
       This is correct, I had the logic backwards.  I've switched to `count_finished_immediately`.  @bkietz  pointed out that this isn't strictly necessary since the critical section we are trying to test is inside `MarkFinished` but I'd like to keep it in to ensure we get one of each type of callback for completeness sake.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #8680: ARROW-10182: [C++] Add basic continuation support to Future

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #8680:
URL: https://github.com/apache/arrow/pull/8680#discussion_r545361101



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -264,11 +335,125 @@ class Future {
   }
 
   /// \brief Make a finished Future<> with the provided Status.
-  template <typename E = ValueType, typename = detail::Empty::EnableIfSame<E>>
+  template <typename E = ValueType, typename = typename std::enable_if<
+                                        std::is_same<E, detail::Empty>::value>::type>
   static Future<> MakeFinished(Status s = Status::OK()) {
     return MakeFinished(E::ToResult(std::move(s)));
   }
 
+  /// \brief Consumer API: Register a callback to run when this future completes
+  template <typename OnComplete>
+  void AddCallback(OnComplete&& on_complete) const {
+    struct Callback {
+      void operator()() && {
+        auto self = weak_self.get();
+        std::move(on_complete)(*self.GetResult());
+      }
+
+      WeakFuture<T> weak_self;
+      OnComplete on_complete;
+    };
+
+    // We know impl_ will not be dangling when invoking callbacks because at least one
+    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
+    // weak reference to impl_ here
+    impl_->AddCallback(
+        Callback{WeakFuture<T>(*this), std::forward<OnComplete>(on_complete)});
+  }
+
+  /// \brief Consumer API: Register a continuation to run when this future completes
+  ///
+  /// The continuation will run in the same thread that called MarkFinished (whatever
+  /// callback is registered with this function will run before MarkFinished returns).
+  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
+  /// returning the future.
+  ///
+  /// Two callbacks are supported:
+  /// - OnSuccess, called against the result (const ValueType&) on successul completion.
+  /// - OnFailure, called against the error (const Status&) on failed completion.
+  ///
+  /// Then() returns a Future whose ValueType is derived from the return type of the
+  /// callbacks. If a callback returns:
+  /// - void, a Future<> will be produced which will completes successully as soon
+  ///   as the callback runs.
+  /// - Status, a Future<> will be produced which will complete with the returned Status
+  ///   as soon as the callback runs.
+  /// - V or Result<V>, a Future<V> will be produced which will complete with the result
+  ///   of invoking the callback as soon as the callback runs.
+  /// - Future<V>, a Future<V> will be produced which will be marked complete when the
+  ///   future returned by the callback completes (and will complete with the same
+  ///   result).
+  ///
+  /// The continued Future type must be the same for both callbacks.
+  ///
+  /// Note that OnFailure can swallow errors, allowing continued Futures to successully
+  /// complete even if this Future fails.
+  ///
+  /// If this future is already completed then the callback will be run immediately
+  /// (before this method returns) and the returned future may already be marked complete
+  /// (it will definitely be marked complete if the callback returns a non-future or a
+  /// completed future).
+  template <typename OnSuccess, typename OnFailure,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success, OnFailure&& on_failure) const {
+    static_assert(
+        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
+                     ContinuedFuture>::value,
+        "OnSuccess and OnFailure must continue with the same future type");
+
+    auto next = ContinuedFuture::Make();
+
+    struct Callback {
+      void operator()(const Result<T>& result) && {
+        if (ARROW_PREDICT_TRUE(result.ok())) {
+          detail::Continue(std::move(next), std::move(on_success), result.ValueOrDie());
+        } else {
+          detail::Continue(std::move(next), std::move(on_failure), result.status());
+        }
+      }
+
+      OnSuccess on_success;
+      OnFailure on_failure;
+      ContinuedFuture next;
+    };
+
+    AddCallback(Callback{std::forward<OnSuccess>(on_success),
+                         std::forward<OnFailure>(on_failure), next});
+
+    return next;
+  }
+
+  /// \brief Overload without OnFailure. Failures will be passed through unchanged.
+  template <typename OnSuccess,
+            typename ContinuedFuture =
+                detail::ContinueFuture::ForSignature<OnSuccess && (const T&)>>
+  ContinuedFuture Then(OnSuccess&& on_success) const {
+    return Then(std::forward<OnSuccess>(on_success), [](const Status& s) {
+      return Result<typename ContinuedFuture::ValueType>(s);
+    });
+  }
+
+  template <typename OnComplete,
+            typename ContinuedFuture = typename detail::ContinueFuture::ForSignature<
+                OnComplete && (const Status&)>>
+  ContinuedFuture Then(decltype(StatusOnly), OnComplete&& on_complete) const {

Review comment:
       I still don't understand what "StatusOnly" is supposed to be for if we change this to a separate name such as "ThenBoth" or "ThenFinally".
   
   (note the problem with "ThenFinally" is that it sounds like a last cleanup step (is it executed out of order?))




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org