You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/08 21:08:44 UTC

[GitHub] [arrow] bkietz commented on a change in pull request #9095: ARROW-10183: [C++] Apply composable futures to CSV

bkietz commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r554155616



##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -35,6 +36,13 @@
 
 namespace arrow {
 
+namespace detail {
+
+template <typename Signature>
+using result_of_t = typename std::result_of<Signature>::type;

Review comment:
       It's technically legal to have this both here and in future.h, but for clarity we should probably have a single alias decl. util/functional.h would be a reasonable place to put it

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {
+    while (!finished_ && last_value_.has_value()) {
+      TransformFlow<V> next = transformer_(*last_value_);
+      if (next.ReadyForNext()) {
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (!next.Ok()) {
+        return next.status();
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::optional<V>();
+  }
+
+  Result<V> Next() {
+    while (!finished_) {
+      util::optional<Result<V>> next = Pump();
+      if (next.has_value()) {
+        return *next;
+      }
+      ARROW_ASSIGN_OR_RAISE(last_value_, it_.Next());
+    }
+    return IterationTraits<V>::End();
+  }
+
+ private:
+  Iterator<T> it_;
+  Transformer<T, V> transformer_;
+  util::optional<T> last_value_;
+  bool finished_;

Review comment:
       ```suggestion
     bool finished_ = false;
   ```

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {
+    while (!finished_ && last_value_.has_value()) {

Review comment:
       This loop is very confusing. Could you rewrite it with a singular condition (`while (!finished_)`, maybe) then include a break statement below?

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {
+    while (!finished_ && last_value_.has_value()) {
+      TransformFlow<V> next = transformer_(*last_value_);
+      if (next.ReadyForNext()) {
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (!next.Ok()) {
+        return next.status();
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::optional<V>();
+  }
+
+  Result<V> Next() {
+    while (!finished_) {
+      util::optional<Result<V>> next = Pump();
+      if (next.has_value()) {
+        return *next;

Review comment:
       ```suggestion
           return std::move(*next);
   ```

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +611,73 @@ inline std::vector<int> WaitForAny(const std::vector<Future<T>*>& futures,
   return waiter->MoveFinishedFutures();
 }
 
+template <typename T = detail::Empty>
+struct ControlFlow {
+  using BreakValueType = T;
+
+  bool IsBreak() const { return break_value_.has_value(); }
+
+  static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+    return std::move(*cf.break_value_);
+  }
+
+  mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+  template <typename T>
+  operator ControlFlow<T>() && {  // NOLINT explicit
+    return {};
+  }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+  return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,
+          typename Control = typename detail::result_of_t<Iterate()>::ValueType,
+          typename BreakValueType = typename Control::BreakValueType>
+Future<BreakValueType> Loop(Iterate iterate) {

Review comment:
       Maybe the typedef for AsyncGenerator would be useful here; then we could write
   
   ```suggestion
   template <typename T>
   using AsyncGenerator = std::function<Future<T>()>;
   
   template <typename BreakValue>
   Future<BreakValue> Loop(AsyncGenerator<ControlFlow<BreakValue>> iterate) {
   ```

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -832,6 +832,145 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+TEST(FutureAllTest, Simple) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(2, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(2, *results[1]);
+  }));
+
+  // Finish in reverse order, results should still be delivered in proper order
+  AssertNotFinished(combined);
+  f2.MarkFinished(2);
+  AssertNotFinished(combined);
+  f1.MarkFinished(1);
+  AssertSuccessful(combined);
+}
+
+TEST(FutureAllTest, Failure) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  auto f3 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2, f3};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(3, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(Status::IOError("XYZ"), results[1].status());
+    ASSERT_EQ(3, *results[2]);
+  }));
+
+  f1.MarkFinished(1);
+  f2.MarkFinished(Status::IOError("XYZ"));
+  f3.MarkFinished(3);
+
+  AssertFinished(combined);
+}
+
+TEST(FutureLoopTest, Sync) {
+  struct {
+    int i = 0;
+    Future<int> Get() { return Future<int>::MakeFinished(i++); }
+  } IntSource;
+
+  bool do_fail = false;
+  std::vector<int> ints;
+  auto loop_body = [&] {
+    return IntSource.Get().Then([&](int i) -> Result<ControlFlow<int>> {
+      if (do_fail && i == 3) {
+        return Status::IOError("xxx");
+      }
+
+      if (i == 5) {
+        int sum = 0;
+        for (int i : ints) sum += i;
+        return Break(sum);
+      }
+
+      ints.push_back(i);
+      return Continue();
+    });
+  };
+
+  {
+    auto sum_fut = Loop(loop_body);
+    AssertSuccessful(sum_fut);
+
+    ASSERT_OK_AND_ASSIGN(auto sum, sum_fut.result());
+    ASSERT_EQ(sum, 0 + 1 + 2 + 3 + 4);
+  }
+
+  {
+    do_fail = true;
+    IntSource.i = 0;
+    auto sum_fut = Loop(loop_body);
+    AssertFailed(sum_fut);
+    ASSERT_RAISES(IOError, sum_fut.result());
+  }
+}
+
+TEST(FutureLoopTest, EmptyBreakValue) {
+  Future<> none_fut =
+      Loop([&] { return Future<>::MakeFinished().Then([&](...) { return Break(); }); });
+  AssertSuccessful(none_fut);
+}
+
+TEST(FutureLoopTest, MoveOnlyBreakValue) {

Review comment:
       It'd be good to have a stress test for `Loop` as well

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {
+    while (!finished_ && last_value_.has_value()) {

Review comment:
       Could you also include a comment describing the control flow, the contract of transformer functions, ...?

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}

Review comment:
       Instead of having TransformFlow potentially be an error, perhaps we could just rely on Result and have `using Transformer = std::function<Result<TransformFlow<V>>(T)>;`?

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {

Review comment:
       I'd usually expect `Result<>` to be outermost in a return type:
   ```suggestion
     Result<util::optional<V>> Pump() {
   ```
   Additionally, that'd enable you to use `ARROW_RETURN_NOT_OK` below

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;

Review comment:
       Use member initializers for fields with no default constructor:
   ```suggestion
     bool finished_ = false;
     bool ready_for_next_ = false;
   ```

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {
+    while (!finished_ && last_value_.has_value()) {
+      TransformFlow<V> next = transformer_(*last_value_);
+      if (next.ReadyForNext()) {
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (!next.Ok()) {
+        return next.status();
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::optional<V>();

Review comment:
       This doesn't match the return type of the function, please use `return {};` or `return util::nullopt;` to be more clear

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -556,6 +560,33 @@ inline bool WaitForAll(const std::vector<Future<T>*>& futures,
   return waiter->Wait(seconds);
 }
 
+template <typename T>

Review comment:
       ```suggestion
   /// \brief Create a Future which completes when all of `futures` complete.
   ///
   /// The future's result is a vector of the results of `futures`.
   /// Note that this future will never be marked "failed"; failed results
   /// will be stored in the result vector alongside successful results.
   template <typename T>
   ```

##########
File path: cpp/src/arrow/util/iterator.cc
##########
@@ -67,6 +67,8 @@ class ReadaheadQueue::Impl : public std::enable_shared_from_this<ReadaheadQueue:
   }
 
   Status PopDone(std::unique_ptr<ReadaheadPromise>* out) {
+    DCHECK_GT(max_readahead_, 0);  // This function has no purpose and should not be
+                                   // called if using the queue unbounded

Review comment:
       `DCHECK` macros (except `DCHECK_OK`) can be streamed into, which makes assert failures more helpful and searchable:
   ```suggestion
       DCHECK_GT(max_readahead_, 0) << "PopDone should never be called if using the queue unbounded";
   ```

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -832,6 +832,145 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+TEST(FutureAllTest, Simple) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(2, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(2, *results[1]);
+  }));
+
+  // Finish in reverse order, results should still be delivered in proper order
+  AssertNotFinished(combined);
+  f2.MarkFinished(2);
+  AssertNotFinished(combined);
+  f1.MarkFinished(1);
+  AssertSuccessful(combined);
+}
+
+TEST(FutureAllTest, Failure) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  auto f3 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2, f3};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(3, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(Status::IOError("XYZ"), results[1].status());
+    ASSERT_EQ(3, *results[2]);
+  }));
+
+  f1.MarkFinished(1);
+  f2.MarkFinished(Status::IOError("XYZ"));
+  f3.MarkFinished(3);
+
+  AssertFinished(combined);
+}
+
+TEST(FutureLoopTest, Sync) {
+  struct {
+    int i = 0;
+    Future<int> Get() { return Future<int>::MakeFinished(i++); }
+  } IntSource;
+
+  bool do_fail = false;
+  std::vector<int> ints;
+  auto loop_body = [&] {
+    return IntSource.Get().Then([&](int i) -> Result<ControlFlow<int>> {
+      if (do_fail && i == 3) {
+        return Status::IOError("xxx");
+      }
+
+      if (i == 5) {
+        int sum = 0;
+        for (int i : ints) sum += i;
+        return Break(sum);
+      }
+
+      ints.push_back(i);
+      return Continue();
+    });
+  };
+
+  {
+    auto sum_fut = Loop(loop_body);
+    AssertSuccessful(sum_fut);
+
+    ASSERT_OK_AND_ASSIGN(auto sum, sum_fut.result());
+    ASSERT_EQ(sum, 0 + 1 + 2 + 3 + 4);
+  }
+
+  {
+    do_fail = true;
+    IntSource.i = 0;
+    auto sum_fut = Loop(loop_body);
+    AssertFailed(sum_fut);
+    ASSERT_RAISES(IOError, sum_fut.result());
+  }
+}
+
+TEST(FutureLoopTest, EmptyBreakValue) {
+  Future<> none_fut =
+      Loop([&] { return Future<>::MakeFinished().Then([&](...) { return Break(); }); });
+  AssertSuccessful(none_fut);
+}
+
+TEST(FutureLoopTest, MoveOnlyBreakValue) {
+  Future<MoveOnlyDataType> one_fut = Loop([&] {
+    return Future<int>::MakeFinished(1).Then(
+        [&](int i) { return Break(MoveOnlyDataType(i)); });
+  });
+  AssertSuccessful(one_fut);
+  ASSERT_OK_AND_ASSIGN(auto one, std::move(one_fut).result());
+  ASSERT_EQ(one, 1);
+}
+
+TEST(FutureLoopTest, StackOverflow) {
+  // Looping over futures is normally a rather recursive task.  If the futures complete
+  // synchronously (because they are already finished) it could lead to a stack overflow
+  // if care is not taken.
+  int counter = 0;
+  auto loop_body = [&counter]() -> Future<ControlFlow<int>> {
+    while (counter < 1000000) {
+      counter++;
+      return Future<ControlFlow<int>>::MakeFinished(Continue());

Review comment:
       Maybe it'd be useful to have an implicit constructor `Future<T>(Result<T>)` for finished futures, then I think we'd be able to just write
   ```c++
         return Continue();
   ```

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +611,73 @@ inline std::vector<int> WaitForAny(const std::vector<Future<T>*>& futures,
   return waiter->MoveFinishedFutures();
 }
 
+template <typename T = detail::Empty>
+struct ControlFlow {
+  using BreakValueType = T;
+
+  bool IsBreak() const { return break_value_.has_value(); }
+
+  static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+    return std::move(*cf.break_value_);
+  }
+
+  mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+  template <typename T>
+  operator ControlFlow<T>() && {  // NOLINT explicit
+    return {};
+  }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+  return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,

Review comment:
       ```suggestion
   /// \brief Loop through an asynchronous sequence
   ///
   /// \param[in] iterate A generator of Future<ControlFlow<BreakValue>>. On completion of each yielded
   /// future the resulting ControlFlow will be examined. A Break will terminate the loop, while a Continue
   /// will re-invoke `iterate`.
   /// \return A future which will complete when a Future returned by iterate completes with a Break
   template <typename Iterate,
   ```

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -129,11 +130,47 @@ template <typename T>
 inline Iterator<T> EmptyIt() {
   return MakeEmptyIterator<T>();
 }
-
 inline Iterator<TestInt> VectorIt(std::vector<TestInt> v) {
   return MakeVectorIterator<TestInt>(std::move(v));
 }
 
+std::function<Future<TestInt>()> AsyncVectorIt(std::vector<TestInt> v) {
+  auto index = std::make_shared<size_t>(0);
+  auto vec = std::make_shared<std::vector<TestInt>>(std::move(v));
+  return [index, vec]() -> Future<TestInt> {
+    if (*index >= vec->size()) {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+    auto next = (*vec)[*index];
+    (*index)++;
+    return Future<TestInt>::MakeFinished(next);
+  };

Review comment:
       ```suggestion
     size_t index = 0;
     return [index, v]() mutable -> Future<TestInt> {
       if (index >= vec.size()) {
         return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
       }
       return Future<TestInt>::MakeFinished(v[index++]);
     };
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org