You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/26 17:04:43 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #9095: ARROW-10183: [C++] Apply composable futures to CSV

pitrou commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r564534403



##########
File path: cpp/src/arrow/csv/reader_benchmark.cc
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <string>
+
+#include "arrow/buffer.h"
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/csv/test_common.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+namespace csv {
+
+class SlowInputStream : public io::InputStream {

Review comment:
       Hmm... why don't you use `SlowInputStream` from `arrow/io/slow.h`?

##########
File path: cpp/src/arrow/csv/reader_benchmark.cc
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <string>
+
+#include "arrow/buffer.h"
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/csv/test_common.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+namespace csv {
+
+class SlowInputStream : public io::InputStream {
+ public:
+  explicit SlowInputStream(std::shared_ptr<io::BufferReader> target, int64_t latency_ms)
+      : target_(std::move(target)) {
+    latency_s_ = static_cast<double>(latency_ms) / 1000.0;
+  }
+  virtual ~SlowInputStream() {}
+
+  Result<util::string_view> Peek(int64_t nbytes) override {
+    return target_->Peek(nbytes);
+  }
+  bool supports_zero_copy() const override { return target_->supports_zero_copy(); }
+  Status Close() override { return target_->Close(); }
+  Status Abort() override { return target_->Abort(); }
+  Result<int64_t> Tell() const override { return target_->Tell(); }
+  bool closed() const override { return target_->closed(); }
+  Result<int64_t> Read(int64_t nbytes, void* out) override {
+    if (latency_s_ > 0) {
+      SleepFor(latency_s_);
+    }
+    return target_->Read(nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    if (latency_s_ > 0) {
+      SleepFor(latency_s_);
+    }
+    return target_->Read(nbytes);
+  }
+  Status Seek(int64_t pos) { return target_->Seek(pos); }
+
+ private:
+  std::shared_ptr<io::BufferReader> target_;
+  double latency_s_;
+};
+
+static ReadOptions CreateReadOptions(bool use_threads, bool use_async) {
+  auto result = csv::ReadOptions::Defaults();
+  result.use_threads = use_threads;
+  result.legacy_blocking_reads = !use_async;
+  // Simulate larger files by using smaller block files so the impact of multiple
+  // blocks is seen but we don't have to spend the time waiting on the large I/O

Review comment:
       I'm not sure this is exactly the same. Using a very small block size (I assume the below is 10kB?) may emphasize fixed costs (managing vectors of shared_ptrs etc.) rather than actual reading/parsing costs.
   

##########
File path: cpp/src/arrow/csv/reader_benchmark.cc
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <string>
+
+#include "arrow/buffer.h"
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/csv/test_common.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+namespace csv {
+
+class SlowInputStream : public io::InputStream {
+ public:
+  explicit SlowInputStream(std::shared_ptr<io::BufferReader> target, int64_t latency_ms)
+      : target_(std::move(target)) {
+    latency_s_ = static_cast<double>(latency_ms) / 1000.0;
+  }
+  virtual ~SlowInputStream() {}
+
+  Result<util::string_view> Peek(int64_t nbytes) override {
+    return target_->Peek(nbytes);
+  }
+  bool supports_zero_copy() const override { return target_->supports_zero_copy(); }
+  Status Close() override { return target_->Close(); }
+  Status Abort() override { return target_->Abort(); }
+  Result<int64_t> Tell() const override { return target_->Tell(); }
+  bool closed() const override { return target_->closed(); }
+  Result<int64_t> Read(int64_t nbytes, void* out) override {
+    if (latency_s_ > 0) {
+      SleepFor(latency_s_);
+    }
+    return target_->Read(nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    if (latency_s_ > 0) {
+      SleepFor(latency_s_);
+    }
+    return target_->Read(nbytes);
+  }
+  Status Seek(int64_t pos) { return target_->Seek(pos); }
+
+ private:
+  std::shared_ptr<io::BufferReader> target_;
+  double latency_s_;
+};
+
+static ReadOptions CreateReadOptions(bool use_threads, bool use_async) {
+  auto result = csv::ReadOptions::Defaults();
+  result.use_threads = use_threads;
+  result.legacy_blocking_reads = !use_async;
+  // Simulate larger files by using smaller block files so the impact of multiple
+  // blocks is seen but we don't have to spend the time waiting on the large I/O
+  result.block_size = (1 << 20) / 100;
+  return result;
+}
+
+static std::shared_ptr<SlowInputStream> CreateStreamReader(std::shared_ptr<Buffer> buffer,
+                                                           int64_t latency_ms) {
+  auto buffer_reader = std::make_shared<io::BufferReader>(buffer);
+  return std::make_shared<SlowInputStream>(buffer_reader, latency_ms);
+}
+
+static void BenchmarkReader(benchmark::State& state, bool use_threads, bool use_async) {
+  auto latency_ms = state.range(0);
+  auto num_rows = state.range(1);
+  auto num_files = state.range(2);
+  if (num_files > 5 && use_threads && !use_async) {

Review comment:
       Why `5`? Is it related to the thread pool size below?

##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/csv/test_common.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/future.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+namespace csv {
+
+void StressTableReader(
+    std::function<Result<std::shared_ptr<TableReader>>(std::shared_ptr<io::InputStream>)>
+        reader_factory) {
+  const int NTASKS = 100;
+  const int NROWS = 1000;
+  ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS));
+
+  std::vector<Future<std::shared_ptr<Table>>> task_futures(NTASKS);
+  for (int i = 0; i < NTASKS; i++) {
+    auto input = std::make_shared<io::BufferReader>(table_buffer);
+    ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input));
+    task_futures[i] = reader->ReadAsync();
+  }
+  auto combined_future = All(task_futures);
+  combined_future.Wait();
+
+  ASSERT_OK_AND_ASSIGN(std::vector<Result<std::shared_ptr<Table>>> results,
+                       combined_future.result());
+  for (auto&& result : results) {
+    ASSERT_OK_AND_ASSIGN(auto table, result);
+    ASSERT_EQ(NROWS, table->num_rows());
+  }
+}
+
+void TestNestedParallelism(
+    std::shared_ptr<internal::ThreadPool> thread_pool,
+    std::function<Result<std::shared_ptr<TableReader>>(std::shared_ptr<io::InputStream>)>
+        reader_factory) {
+  const int NROWS = 1000;
+  ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS));
+  auto input = std::make_shared<io::BufferReader>(table_buffer);
+  ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input));
+
+  Future<std::shared_ptr<Table>> table_future;
+
+  auto read_task = [&reader, &table_future]() mutable {
+    table_future = reader->ReadAsync();
+    return Status::OK();
+  };
+  ASSERT_OK_AND_ASSIGN(auto future, thread_pool->Submit(read_task));
+  ASSERT_TRUE(future.Wait(1));
+
+  if (future.is_finished()) {
+    ASSERT_TRUE(table_future.Wait(1));
+    if (table_future.is_finished()) {

Review comment:
       Same question here.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -832,9 +832,203 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+TEST(FutureAllTest, Simple) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {

Review comment:
       It would seem more robust and readable to test for the `result()` below.

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +615,74 @@ inline std::vector<int> WaitForAny(const std::vector<Future<T>*>& futures,
   return waiter->MoveFinishedFutures();
 }
 
+template <typename T = detail::Empty>
+struct ControlFlow {
+  using BreakValueType = T;
+
+  bool IsBreak() const { return break_value_.has_value(); }
+
+  static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+    return std::move(*cf.break_value_);
+  }
+
+  mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+  template <typename T>
+  operator ControlFlow<T>() && {  // NOLINT explicit
+    return {};
+  }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+  return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,
+          typename Control = typename detail::result_of_t<Iterate()>::ValueType,
+          typename BreakValueType = typename Control::BreakValueType>
+Future<BreakValueType> Loop(Iterate iterate) {
+  auto break_fut = Future<BreakValueType>::Make();
+
+  struct Callback {
+    bool CheckForTermination(const Result<Control>& maybe_control) {
+      if (!maybe_control.ok() || maybe_control->IsBreak()) {
+        Result<BreakValueType> maybe_break = maybe_control.Map(Control::MoveBreakValue);
+        break_fut.MarkFinished(std::move(maybe_break));
+        return true;
+      }
+      return false;
+    }
+
+    void operator()(const Result<Control>& maybe_control) && {
+      if (CheckForTermination(maybe_control)) return;
+
+      auto control_fut = iterate();
+      while (control_fut.is_finished()) {
+        // There's no need to AddCallback on a finished future; we can CheckForTermination
+        // now. This also avoids recursion and potential stack overflow.
+        if (CheckForTermination(control_fut.result())) return;
+
+        control_fut = iterate();
+      }
+      control_fut.AddCallback(std::move(*this));
+    }
+
+    Iterate iterate;
+    // If the future returned by control_fut is never completed then we will be hanging on
+    // to break_fut forever even if the listener has given up listening on it.  Instead we
+    // rely on the fact that a producer (the caller of Future<>::Make) is always
+    // responsible for completing the futures they create.
+    // TODO: Could avoid this kind of situation with "future abandonment" similar to mesos
+    Future<BreakValueType> break_fut;

Review comment:
       Can't we simply use `WeakFuture` to avoid the aforementioned problem?

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+  auto original = VectorIt({1, 2, 3});
+  auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+  auto original = VectorIt<std::shared_ptr<int>>(
+      {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
+  auto truncated =
+      MakeTransformedIterator(std::move(original), MakeFirstN<std::shared_ptr<int>>(2));
+  ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+  ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+  // Tests the failsafe case where we never call Finish
+  auto original = VectorIt({1});
+  auto truncated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                             MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto background = BackgroundAsyncVectorIt(expected);
+  auto future = CollectAsyncGenerator(background);
+  ASSERT_FALSE(future.is_finished());
+  future.Wait();
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+  Result<TestInt> Next() {
+    if (called_) {
+      return Status::Invalid("Should not have been called twice");
+    }
+    SleepFor(0.1);
+    return IterationTraits<TestInt>::End();
+  }
+
+ private:
+  bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+  // Ensure that the background iterator properly fulfills the asyncgenerator contract
+  // and can be called after it ends.
+  auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+  ASSERT_OK_AND_ASSIGN(
+      auto background_iter,
+      MakeBackgroundIterator(std::move(iterator), internal::GetCpuThreadPool()));
+
+  auto one = background_iter();
+  auto two = background_iter();
+
+  ASSERT_TRUE(one.Wait(0.5));
+
+  if (one.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+  }
+
+  ASSERT_TRUE(two.Wait(0.5));
+  ASSERT_TRUE(two.is_finished());
+  if (two.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+  }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+  AsyncGenerator<TestInt> generator = []() {
+    return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+  };
+  Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, skip_all);
+  auto future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+  ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+  auto expected = RangeVector(100);
+  std::vector<Future<std::vector<TestInt>>> futures;
+  for (unsigned int i = 0; i < 100; i++) {
+    auto background = BackgroundAsyncVectorIt(expected);
+    futures.push_back(CollectAsyncGenerator(background));
+  }
+  auto combined = All(futures);
+  combined.Wait(2);
+  if (combined.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+    for (auto&& vector : completed_vectors) {
+      ASSERT_EQ(vector, expected);
+    }
+  } else {
+    FAIL() << "After 2 seconds all background iterators had not finished collecting";
+  }
+}
+
+TEST(TestAsyncUtil, StackOverflow) {
+  int counter = 0;
+  AsyncGenerator<TestInt> generator = [&counter]() {
+    if (counter < 1000000) {
+      return Future<TestInt>::MakeFinished(counter++);
+    } else {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+  };
+  Transformer<TestInt, TestInt> discard =
+      [](TestInt next) -> Result<TransformFlow<TestInt>> { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, discard);
+  auto collected_future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(collected_future.Wait(5));
+  if (collected_future.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto collected, collected_future.result());
+    ASSERT_EQ(0, collected.size());
+  }
+}
+
+TEST(TestAsyncUtil, Visit) {
+  auto generator = AsyncVectorIt({1, 2, 3});
+  unsigned int sum = 0;
+  auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt item) {
+    sum += item.value;
+    return Status::OK();
+  });
+  // Should be superfluous
+  sum_future.Wait();
+  ASSERT_EQ(6, sum);
+}
+
+TEST(TestAsyncUtil, Collect) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto generator = AsyncVectorIt(expected);
+  auto collected = CollectAsyncGenerator(generator);
+  ASSERT_EQ(expected, *collected.result());
+}
+
+template <typename T>
+Transformer<T, T> MakeRepeatN(int repeat_count) {
+  int current_repeat = 0;
+  return [repeat_count, current_repeat](T next) mutable -> Result<TransformFlow<T>> {
+    current_repeat++;
+    bool ready_for_next = false;
+    if (current_repeat == repeat_count) {
+      current_repeat = 0;
+      ready_for_next = true;
+    }
+    return TransformYield(next, ready_for_next);
+  };
+}
+
+TEST(TestIteratorTransform, Repeating) {
+  auto original = VectorIt({1, 2, 3});
+  auto repeated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                            MakeRepeatN<TestInt>(2));
+  AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated));
+}
+
+template <typename T>
+Transformer<T, T> MakeFilter(std::function<bool(T&)> filter) {
+  return [filter](T next) -> Result<TransformFlow<T>> {
+    if (filter(next)) {
+      return TransformYield(next);
+    } else {
+      return TransformSkip();
+    }
+  };
+}
+
+template <typename T>
+Transformer<T, T> MakeAbortOnSecond() {
+  int counter = 0;
+  return [counter](T next) mutable -> Result<TransformFlow<T>> {
+    if (counter++ == 1) {
+      return Status::Invalid("X");
+    }
+    return TransformYield(next);
+  };
+}
+
+TEST(TestIteratorTransform, SkipSome) {
+  // Exercises TransformSkip
+  auto original = VectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return t.value != 2; });
+  auto filtered = MakeTransformedIterator(std::move(original), filter);
+  AssertIteratorMatch({1, 3}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, SkipAll) {
+  // Exercises TransformSkip
+  auto original = VectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return false; });
+  auto filtered = MakeTransformedIterator(std::move(original), filter);
+  AssertIteratorMatch({}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, Abort) {
+  auto original = VectorIt({1, 2, 3});
+  auto transformed =
+      MakeTransformedIterator(std::move(original), MakeAbortOnSecond<TestInt>());
+  ASSERT_OK(transformed.Next());
+  ASSERT_RAISES(Invalid, transformed.Next());
+}
+
+TEST(TestAsyncIteratorTransform, SkipSome) {
+  auto original = AsyncVectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return t.value != 2; });
+  auto filtered = TransformAsyncGenerator(std::move(original), filter);
+  AssertAsyncGeneratorMatch({1, 3}, std::move(filtered));
+}
+
+TEST(TestAsyncUtil, ReadaheadFailed) {
+  auto source = []() -> Future<TestInt> {
+    return Future<TestInt>::MakeFinished(Status::Invalid("X"));
+  };
+  auto readahead = AddReadahead<TestInt>(source, 10);
+  auto next = readahead();
+  ASSERT_EQ(Status::Invalid("X"), next.status());
+}
+
+TEST(TestAsyncUtil, Readahead) {
+  int num_delivered = 0;
+  auto source = [&num_delivered]() {
+    if (num_delivered < 5) {
+      return Future<TestInt>::MakeFinished(num_delivered++);
+    } else {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+  };
+  auto readahead = AddReadahead<TestInt>(source, 10);
+  // Should not pump until first item requested

Review comment:
       Sounds weird. Is there a motivation for this?

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -135,6 +149,18 @@ class ThreadedTaskGroup : public TaskGroup {
       // before cv.notify_one() has returned
       std::unique_lock<std::mutex> lock(mutex_);
       cv_.notify_one();
+      if (completion_future_.has_value()) {
+        // MarkFinished could be slow.  We don't want to call it while we are holding
+        // the lock.
+        // TODO: If optional is thread safe then we can skip this locking entirely
+        auto future = *completion_future_;

Review comment:
       `auto&`?

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -135,6 +149,18 @@ class ThreadedTaskGroup : public TaskGroup {
       // before cv.notify_one() has returned
       std::unique_lock<std::mutex> lock(mutex_);
       cv_.notify_one();
+      if (completion_future_.has_value()) {
+        // MarkFinished could be slow.  We don't want to call it while we are holding
+        // the lock.
+        // TODO: If optional is thread safe then we can skip this locking entirely
+        auto future = *completion_future_;
+        auto finished = completion_future_->is_finished();
+        auto status = status_;
+        lock.unlock();
+        if (!finished) {
+          future.MarkFinished(status);

Review comment:
       It seems it is possible for `MarkFinished` to be called from several threads at once here, but it's not thread-safe.

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);
+      }
+
+      auto next_fut = generator_();
+      // If finished already, process results immediately inside the loop to avoid stack
+      // overflow
+      if (next_fut.is_finished()) {
+        auto next_result = next_fut.result();
+        if (next_result.ok()) {
+          last_value_ = *next_result;
+        } else {
+          return Future<V>::MakeFinished(next_result.status());
+        }
+        // Otherwise, if not finished immediately, add callback to process results
+      } else {
+        return next_fut.Then([this](const Result<T>& next_result) {
+          if (next_result.ok()) {
+            last_value_ = *next_result;
+            return (*this)();
+          } else {
+            return Future<V>::MakeFinished(next_result.status());
+          }
+        });
+      }
+    }
+  }
+
+ protected:
+  bool finished_;
+  util::optional<T> last_value_;
+  AsyncGenerator<T> generator_;
+  Transformer<T, V> transformer_;
+};
+
+template <typename T>
+static std::function<void(const Result<T>&)> MakeCallback(
+    std::shared_ptr<bool> finished) {
+  return [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+}
+
+template <typename T>
+AsyncGenerator<T> AddReadahead(AsyncGenerator<T> source_generator, int max_readahead) {
+  // Using a shared_ptr instead of a lambda capture here because it's possible that
+  // the inner mark_finished_if_done outlives the outer lambda
+  auto finished = std::make_shared<bool>(false);
+  auto mark_finished_if_done = [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+
+  std::queue<Future<T>> readahead_queue;
+  return [=]() mutable -> Future<T> {
+    if (readahead_queue.empty()) {
+      // This is the first request, let's pump the underlying queue
+      for (int i = 0; i < max_readahead; i++) {
+        auto next = source_generator();
+        next.AddCallback(mark_finished_if_done);
+        readahead_queue.push(std::move(next));
+      }
+    }
+    // Pop one and add one
+    auto result = readahead_queue.front();
+    readahead_queue.pop();
+    if (*finished) {
+      readahead_queue.push(Future<T>::MakeFinished(IterationTraits<T>::End()));
+    } else {
+      auto back_of_queue = source_generator();
+      back_of_queue.AddCallback(mark_finished_if_done);
+      readahead_queue.push(std::move(back_of_queue));
+    }
+    return result;
+  };
+}
+
+/// \brief Transforms an async generator using a transformer function returning a new
+/// AsyncGenerator
+///
+/// The transform function here behaves exactly the same as the transform function in
+/// MakeTransformedIterator and you can safely use the same transform function to
+/// transform both synchronous and asynchronous streams.
+template <typename T, typename V>
+AsyncGenerator<V> TransformAsyncGenerator(AsyncGenerator<T> generator,
+                                          Transformer<T, V> transformer) {
+  return TransformingGenerator<T, V>(generator, transformer);
+}
+
+namespace detail {
+
+template <typename T>
+struct BackgroundIteratorPromise : ReadaheadPromise {
+  ~BackgroundIteratorPromise() override {}
+
+  explicit BackgroundIteratorPromise(Iterator<T>* it) : it_(it) {}
+
+  bool Call() override {
+    auto next = it_->Next();
+    auto finished = next == IterationTraits<T>::End();
+    out_.MarkFinished(std::move(next));
+    return finished;
+  }
+
+  void End() override { out_.MarkFinished(IterationTraits<T>::End()); }
+
+  Iterator<T>* it_;
+  Future<T> out_ = Future<T>::Make();
+};
+
+}  // namespace detail
+
+/// \brief Async generator that iterates on an underlying iterator in a
+/// separate thread.
+template <typename T>
+class BackgroundIterator {
+  using PromiseType = typename detail::BackgroundIteratorPromise<T>;
+
+ public:
+  explicit BackgroundIterator(Iterator<T> it, internal::Executor* executor)
+      : it_(new Iterator<T>(std::move(it))),
+        queue_(new detail::ReadaheadQueue(0)),
+        executor_(executor),
+        done_() {}
+
+  ~BackgroundIterator() {
+    if (queue_) {
+      // Make sure the queue doesn't call any promises after this object
+      // is destroyed.
+      queue_->EnsureShutdownOrDie();
+    }
+  }
+
+  ARROW_DEFAULT_MOVE_AND_ASSIGN(BackgroundIterator);
+  ARROW_DISALLOW_COPY_AND_ASSIGN(BackgroundIterator);
+
+  Future<T> operator()() {
+    if (done_) {
+      return Future<T>::MakeFinished(IterationTraits<T>::End());
+    }
+    auto promise = std::unique_ptr<PromiseType>(new PromiseType{it_.get()});
+    auto result = Future<T>(promise->out_);

Review comment:
       `result` is a bit misleading...

##########
File path: cpp/src/arrow/csv/reader_benchmark.cc
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <string>
+
+#include "arrow/buffer.h"
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/csv/test_common.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+namespace csv {
+
+class SlowInputStream : public io::InputStream {
+ public:
+  explicit SlowInputStream(std::shared_ptr<io::BufferReader> target, int64_t latency_ms)
+      : target_(std::move(target)) {
+    latency_s_ = static_cast<double>(latency_ms) / 1000.0;
+  }
+  virtual ~SlowInputStream() {}
+
+  Result<util::string_view> Peek(int64_t nbytes) override {
+    return target_->Peek(nbytes);
+  }
+  bool supports_zero_copy() const override { return target_->supports_zero_copy(); }
+  Status Close() override { return target_->Close(); }
+  Status Abort() override { return target_->Abort(); }
+  Result<int64_t> Tell() const override { return target_->Tell(); }
+  bool closed() const override { return target_->closed(); }
+  Result<int64_t> Read(int64_t nbytes, void* out) override {
+    if (latency_s_ > 0) {
+      SleepFor(latency_s_);
+    }
+    return target_->Read(nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    if (latency_s_ > 0) {
+      SleepFor(latency_s_);
+    }
+    return target_->Read(nbytes);
+  }
+  Status Seek(int64_t pos) { return target_->Seek(pos); }
+
+ private:
+  std::shared_ptr<io::BufferReader> target_;
+  double latency_s_;
+};
+
+static ReadOptions CreateReadOptions(bool use_threads, bool use_async) {
+  auto result = csv::ReadOptions::Defaults();
+  result.use_threads = use_threads;
+  result.legacy_blocking_reads = !use_async;
+  // Simulate larger files by using smaller block files so the impact of multiple
+  // blocks is seen but we don't have to spend the time waiting on the large I/O
+  result.block_size = (1 << 20) / 100;
+  return result;
+}
+
+static std::shared_ptr<SlowInputStream> CreateStreamReader(std::shared_ptr<Buffer> buffer,
+                                                           int64_t latency_ms) {
+  auto buffer_reader = std::make_shared<io::BufferReader>(buffer);
+  return std::make_shared<SlowInputStream>(buffer_reader, latency_ms);
+}
+
+static void BenchmarkReader(benchmark::State& state, bool use_threads, bool use_async) {
+  auto latency_ms = state.range(0);
+  auto num_rows = state.range(1);
+  auto num_files = state.range(2);
+  if (num_files > 5 && use_threads && !use_async) {
+    state.SkipWithError("Would deadlock");
+  }
+  auto input_buffer = *MakeSampleCsvBuffer(num_rows);
+  // Hard coding # of threads so we don't deadlock if there are too few cores
+  ASSIGN_OR_ABORT(auto thread_pool, internal::ThreadPool::Make(6));
+  io::AsyncContext async_context(thread_pool.get());
+  while (state.KeepRunning()) {
+    std::vector<Future<std::shared_ptr<Table>>> table_futures;
+    for (int i = 0; i < num_files; i++) {
+      auto stream_reader = CreateStreamReader(input_buffer, latency_ms);
+      auto table_reader = *csv::TableReader::Make(
+          default_memory_pool(), async_context, stream_reader,
+          CreateReadOptions(use_threads, use_async), csv::ParseOptions::Defaults(),
+          csv::ConvertOptions::Defaults());
+      if (use_async) {
+        table_futures.push_back(table_reader->ReadAsync());
+      } else {
+        ASSERT_OK_AND_ASSIGN(auto table_future,
+                             async_context.executor->Submit(
+                                 [table_reader] { return table_reader->Read(); }));
+        table_futures.push_back(table_future);
+      }
+    }
+    auto combined = All(table_futures);
+    ASSIGN_OR_ABORT(auto result, combined.result());
+    for (auto&& table : result) {
+      ABORT_NOT_OK(table);
+    }
+  }
+  state.SetItemsProcessed(state.iterations() * num_rows);

Review comment:
       Do we want to also multiply by `num_files`?

##########
File path: cpp/src/arrow/csv/test_common.cc
##########
@@ -61,5 +61,47 @@ void MakeColumnParser(std::vector<std::string> items, std::shared_ptr<BlockParse
   ASSERT_EQ((*out)->num_rows(), items.size());
 }
 
+const std::vector<std::string> int64_rows = {"123", "4", "-317005557", "", "N/A", "0"};
+const std::vector<std::string> float_rows = {"0", "123.456", "-3170.55766", "", "N/A"};
+const std::vector<std::string> decimal128_rows = {"0", "123.456", "-3170.55766",
+                                                  "",  "N/A",     "1233456789.123456789"};
+const std::vector<std::string> iso8601_rows = {"1917-10-17", "2018-09-13",
+                                               "1941-06-22 04:00", "1945-05-09 09:45:38"};
+const std::vector<std::string> strptime_rows = {"10/17/1917", "9/13/2018", "9/5/1945"};

Review comment:
       Unless you're passing an explicit schema, the decimal128 and strptime rows won't be type-inferred as you seem to expect them to?

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {

Review comment:
       Iterator transform tests should use two different types for `T` and `V` (for example `int` and `std::string`), IMHO, to make sure that there's no place where the original value is passed through by mistake.

##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/csv/test_common.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/future.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+namespace csv {
+
+void StressTableReader(
+    std::function<Result<std::shared_ptr<TableReader>>(std::shared_ptr<io::InputStream>)>
+        reader_factory) {
+  const int NTASKS = 100;
+  const int NROWS = 1000;
+  ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS));
+
+  std::vector<Future<std::shared_ptr<Table>>> task_futures(NTASKS);
+  for (int i = 0; i < NTASKS; i++) {
+    auto input = std::make_shared<io::BufferReader>(table_buffer);
+    ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input));
+    task_futures[i] = reader->ReadAsync();
+  }
+  auto combined_future = All(task_futures);
+  combined_future.Wait();
+
+  ASSERT_OK_AND_ASSIGN(std::vector<Result<std::shared_ptr<Table>>> results,
+                       combined_future.result());
+  for (auto&& result : results) {
+    ASSERT_OK_AND_ASSIGN(auto table, result);
+    ASSERT_EQ(NROWS, table->num_rows());
+  }
+}
+
+void TestNestedParallelism(
+    std::shared_ptr<internal::ThreadPool> thread_pool,
+    std::function<Result<std::shared_ptr<TableReader>>(std::shared_ptr<io::InputStream>)>
+        reader_factory) {
+  const int NROWS = 1000;
+  ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS));
+  auto input = std::make_shared<io::BufferReader>(table_buffer);
+  ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input));
+
+  Future<std::shared_ptr<Table>> table_future;
+
+  auto read_task = [&reader, &table_future]() mutable {
+    table_future = reader->ReadAsync();
+    return Status::OK();
+  };
+  ASSERT_OK_AND_ASSIGN(auto future, thread_pool->Submit(read_task));
+  ASSERT_TRUE(future.Wait(1));
+
+  if (future.is_finished()) {

Review comment:
       Hmm... what if it's not finished? Does it mean we should error out?

##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/csv/test_common.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/future.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+namespace csv {
+
+void StressTableReader(
+    std::function<Result<std::shared_ptr<TableReader>>(std::shared_ptr<io::InputStream>)>
+        reader_factory) {

Review comment:
       Hide the `std::function<...>` behing a type alias (`using TableReaderFactory = ...`) to avoid repeating it several times?

##########
File path: cpp/src/arrow/csv/test_common.cc
##########
@@ -61,5 +61,47 @@ void MakeColumnParser(std::vector<std::string> items, std::shared_ptr<BlockParse
   ASSERT_EQ((*out)->num_rows(), items.size());
 }
 
+const std::vector<std::string> int64_rows = {"123", "4", "-317005557", "", "N/A", "0"};
+const std::vector<std::string> float_rows = {"0", "123.456", "-3170.55766", "", "N/A"};
+const std::vector<std::string> decimal128_rows = {"0", "123.456", "-3170.55766",
+                                                  "",  "N/A",     "1233456789.123456789"};
+const std::vector<std::string> iso8601_rows = {"1917-10-17", "2018-09-13",
+                                               "1941-06-22 04:00", "1945-05-09 09:45:38"};
+const std::vector<std::string> strptime_rows = {"10/17/1917", "9/13/2018", "9/5/1945"};
+
+static void WriteHeader(std::ostream& writer) {
+  writer << "Int64,Float,Decimal128,ISO8601,Strptime" << std::endl;
+}
+
+static std::string GetCell(std::vector<std::string> base_rows, size_t row_index) {

Review comment:
       Nit, but a const-ref is more efficient here.

##########
File path: cpp/src/arrow/csv/test_common.cc
##########
@@ -61,5 +61,47 @@ void MakeColumnParser(std::vector<std::string> items, std::shared_ptr<BlockParse
   ASSERT_EQ((*out)->num_rows(), items.size());
 }
 
+const std::vector<std::string> int64_rows = {"123", "4", "-317005557", "", "N/A", "0"};

Review comment:
       Everything that's not exposed in a `.h` should be enclosed in the anonymous namespace, to miminize exported symbols.

##########
File path: cpp/src/arrow/csv/test_common.h
##########
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "arrow/csv/parser.h"
+#include "arrow/io/memory.h"

Review comment:
       You don't need this include in this `.h` file.

##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/csv/test_common.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/future.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+namespace csv {
+
+void StressTableReader(
+    std::function<Result<std::shared_ptr<TableReader>>(std::shared_ptr<io::InputStream>)>
+        reader_factory) {
+  const int NTASKS = 100;
+  const int NROWS = 1000;
+  ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS));
+
+  std::vector<Future<std::shared_ptr<Table>>> task_futures(NTASKS);
+  for (int i = 0; i < NTASKS; i++) {
+    auto input = std::make_shared<io::BufferReader>(table_buffer);
+    ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input));
+    task_futures[i] = reader->ReadAsync();
+  }
+  auto combined_future = All(task_futures);
+  combined_future.Wait();
+
+  ASSERT_OK_AND_ASSIGN(std::vector<Result<std::shared_ptr<Table>>> results,
+                       combined_future.result());
+  for (auto&& result : results) {
+    ASSERT_OK_AND_ASSIGN(auto table, result);
+    ASSERT_EQ(NROWS, table->num_rows());
+  }
+}
+
+void TestNestedParallelism(
+    std::shared_ptr<internal::ThreadPool> thread_pool,
+    std::function<Result<std::shared_ptr<TableReader>>(std::shared_ptr<io::InputStream>)>
+        reader_factory) {
+  const int NROWS = 1000;
+  ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS));
+  auto input = std::make_shared<io::BufferReader>(table_buffer);
+  ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input));
+
+  Future<std::shared_ptr<Table>> table_future;
+
+  auto read_task = [&reader, &table_future]() mutable {
+    table_future = reader->ReadAsync();
+    return Status::OK();
+  };
+  ASSERT_OK_AND_ASSIGN(auto future, thread_pool->Submit(read_task));
+  ASSERT_TRUE(future.Wait(1));
+
+  if (future.is_finished()) {
+    ASSERT_TRUE(table_future.Wait(1));
+    if (table_future.is_finished()) {
+      ASSERT_OK_AND_ASSIGN(auto table, table_future.result());
+      ASSERT_EQ(table->num_rows(), NROWS);
+    }
+  }
+}  // namespace csv
+
+TEST(SerialReaderTests, Stress) {
+  auto task_factory = [](std::shared_ptr<io::InputStream> input_stream) {
+    return TableReader::Make(default_memory_pool(), io::AsyncContext(), input_stream,
+                             ReadOptions::Defaults(), ParseOptions::Defaults(),
+                             ConvertOptions::Defaults());
+  };
+  StressTableReader(task_factory);
+}
+
+TEST(SerialReaderTests, NestedParallelism) {
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));

Review comment:
       This is testing a harmless kind of nested parallelism with two different thread pools, right?
   (the one instantiated above and the one used by default for CPU tasks)

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -837,20 +885,121 @@ class ThreadedTableReader : public BaseTableReader {
   }
 
  protected:
-  ThreadPool* thread_pool_;
+  Executor* thread_pool_;
+  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+};
+
+class AsyncThreadedTableReader
+    : public BaseTableReader,
+      public std::enable_shared_from_this<AsyncThreadedTableReader> {
+ public:
+  using BaseTableReader::BaseTableReader;
+
+  AsyncThreadedTableReader(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
+                           const ReadOptions& read_options,
+                           const ParseOptions& parse_options,
+                           const ConvertOptions& convert_options, Executor* thread_pool)
+      : BaseTableReader(pool, input, read_options, parse_options, convert_options),
+        thread_pool_(thread_pool) {}
+
+  ~AsyncThreadedTableReader() override {
+    if (task_group_) {
+      // In case of error, make sure all pending tasks are finished before
+      // we start destroying BaseTableReader members
+      ARROW_UNUSED(task_group_->Finish());
+    }
+  }
+
+  Status Init() override {
+    ARROW_ASSIGN_OR_RAISE(auto istream_it,
+                          io::MakeInputStreamIterator(input_, read_options_.block_size));
+
+    ARROW_ASSIGN_OR_RAISE(auto bg_it,
+                          MakeBackgroundIterator(std::move(istream_it), thread_pool_));
+
+    int32_t block_queue_size = thread_pool_->GetCapacity();
+    auto rh_it = AddReadahead(bg_it, block_queue_size);
+    buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(rh_it));
+    return Status::OK();
+  }
+
+  Result<std::shared_ptr<Table>> Read() override { return ReadAsync().result(); }
+
+  Future<std::shared_ptr<Table>> ReadAsync() override {
+    task_group_ = internal::TaskGroup::MakeThreaded(thread_pool_);

Review comment:
       Do you have plans to remove TaskGroup from the picture? Ideally we should be able to call `Future::Then` on an executor, or something similar?

##########
File path: cpp/src/arrow/result.h
##########
@@ -331,6 +332,7 @@ class ARROW_MUST_USE_TYPE Result : public util::EqualityComparable<Result<T>> {
     return ValueUnsafe();
   }
   T& operator*() & { return ValueOrDie(); }
+  T* operator->() { return &ValueOrDie(); }

Review comment:
       I don't know if it makes sense to add a rvalue variant below?

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -129,11 +130,44 @@ template <typename T>
 inline Iterator<T> EmptyIt() {
   return MakeEmptyIterator<T>();
 }
-
 inline Iterator<TestInt> VectorIt(std::vector<TestInt> v) {
   return MakeVectorIterator<TestInt>(std::move(v));
 }
 
+std::function<Future<TestInt>()> AsyncVectorIt(std::vector<TestInt> v) {

Review comment:
       Return `AsyncGenerator<TestInt>`?

##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/csv/test_common.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/future.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+namespace csv {
+
+void StressTableReader(
+    std::function<Result<std::shared_ptr<TableReader>>(std::shared_ptr<io::InputStream>)>
+        reader_factory) {
+  const int NTASKS = 100;
+  const int NROWS = 1000;
+  ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS));
+
+  std::vector<Future<std::shared_ptr<Table>>> task_futures(NTASKS);
+  for (int i = 0; i < NTASKS; i++) {
+    auto input = std::make_shared<io::BufferReader>(table_buffer);
+    ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input));
+    task_futures[i] = reader->ReadAsync();
+  }
+  auto combined_future = All(task_futures);
+  combined_future.Wait();
+
+  ASSERT_OK_AND_ASSIGN(std::vector<Result<std::shared_ptr<Table>>> results,
+                       combined_future.result());
+  for (auto&& result : results) {
+    ASSERT_OK_AND_ASSIGN(auto table, result);
+    ASSERT_EQ(NROWS, table->num_rows());
+  }
+}
+
+void TestNestedParallelism(
+    std::shared_ptr<internal::ThreadPool> thread_pool,
+    std::function<Result<std::shared_ptr<TableReader>>(std::shared_ptr<io::InputStream>)>
+        reader_factory) {
+  const int NROWS = 1000;
+  ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS));
+  auto input = std::make_shared<io::BufferReader>(table_buffer);
+  ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input));
+
+  Future<std::shared_ptr<Table>> table_future;
+
+  auto read_task = [&reader, &table_future]() mutable {
+    table_future = reader->ReadAsync();
+    return Status::OK();
+  };
+  ASSERT_OK_AND_ASSIGN(auto future, thread_pool->Submit(read_task));
+  ASSERT_TRUE(future.Wait(1));
+
+  if (future.is_finished()) {
+    ASSERT_TRUE(table_future.Wait(1));
+    if (table_future.is_finished()) {
+      ASSERT_OK_AND_ASSIGN(auto table, table_future.result());
+      ASSERT_EQ(table->num_rows(), NROWS);
+    }
+  }
+}  // namespace csv
+
+TEST(SerialReaderTests, Stress) {
+  auto task_factory = [](std::shared_ptr<io::InputStream> input_stream) {
+    return TableReader::Make(default_memory_pool(), io::AsyncContext(), input_stream,
+                             ReadOptions::Defaults(), ParseOptions::Defaults(),
+                             ConvertOptions::Defaults());
+  };
+  StressTableReader(task_factory);
+}
+
+TEST(SerialReaderTests, NestedParallelism) {
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
+  auto task_factory = [](std::shared_ptr<io::InputStream> input_stream) {
+    return TableReader::Make(default_memory_pool(), io::AsyncContext(), input_stream,
+                             ReadOptions::Defaults(), ParseOptions::Defaults(),
+                             ConvertOptions::Defaults());
+  };
+  TestNestedParallelism(thread_pool, task_factory);
+}
+
+TEST(ThreadedReaderTests, Stress) {
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
+  auto task_factory = [&thread_pool](std::shared_ptr<io::InputStream> input_stream)
+      -> Result<std::shared_ptr<TableReader>> {
+    ReadOptions read_options = ReadOptions::Defaults();
+    read_options.use_threads = true;
+    read_options.legacy_blocking_reads = true;
+    auto table_reader = TableReader::Make(
+        default_memory_pool(), io::AsyncContext(thread_pool.get()), input_stream,

Review comment:
       I'm not sure what difference it makes to change the executor used by the `AsyncContext`?
   (in any case it's an executor for IO tasks, which should be distinct from the executor for CPU tasks)

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -556,6 +559,38 @@ inline bool WaitForAll(const std::vector<Future<T>*>& futures,
   return waiter->Wait(seconds);
 }
 
+/// \brief Create a Future which completes when all of `futures` complete.
+///
+/// The future's result is a vector of the results of `futures`.
+/// Note that this future will never be marked "failed"; failed results
+/// will be stored in the result vector alongside successful results.
+template <typename T>
+Future<std::vector<Result<T>>> All(std::vector<Future<T>> futures) {
+  struct State {
+    explicit State(std::vector<Future<T>> f)
+        : futures(std::move(f)), n_remaining(futures.size()) {}
+
+    std::vector<Future<T>> futures;
+    std::atomic<size_t> n_remaining;
+  };

Review comment:
       Hmm... did you investigate whether it was possible to reuse `FutureWaiter` instead of recreating a similar logic here?

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"

Review comment:
       Please check whether all these includes are necessary. At least `thread_pool.h` doesn't seem necessary, unless I'm mistaken.

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +187,122 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_ = false;
+  bool ready_for_next_ = false;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {

Review comment:
       What is "ready_for_next"?

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {

Review comment:
       Is this supposed to be a public method?

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {

Review comment:
       Since we don't know how much state the generator and visitor carry, I think it would be better to move-construct an anonymous callable object.

##########
File path: cpp/src/arrow/csv/reader_benchmark.cc
##########
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include <string>
+
+#include "arrow/buffer.h"
+#include "arrow/csv/options.h"
+#include "arrow/csv/reader.h"
+#include "arrow/csv/test_common.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+namespace csv {
+
+class SlowInputStream : public io::InputStream {
+ public:
+  explicit SlowInputStream(std::shared_ptr<io::BufferReader> target, int64_t latency_ms)
+      : target_(std::move(target)) {
+    latency_s_ = static_cast<double>(latency_ms) / 1000.0;
+  }
+  virtual ~SlowInputStream() {}
+
+  Result<util::string_view> Peek(int64_t nbytes) override {
+    return target_->Peek(nbytes);
+  }
+  bool supports_zero_copy() const override { return target_->supports_zero_copy(); }
+  Status Close() override { return target_->Close(); }
+  Status Abort() override { return target_->Abort(); }
+  Result<int64_t> Tell() const override { return target_->Tell(); }
+  bool closed() const override { return target_->closed(); }
+  Result<int64_t> Read(int64_t nbytes, void* out) override {
+    if (latency_s_ > 0) {
+      SleepFor(latency_s_);
+    }
+    return target_->Read(nbytes, out);
+  }
+  Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+    if (latency_s_ > 0) {
+      SleepFor(latency_s_);
+    }
+    return target_->Read(nbytes);
+  }
+  Status Seek(int64_t pos) { return target_->Seek(pos); }
+
+ private:
+  std::shared_ptr<io::BufferReader> target_;
+  double latency_s_;
+};
+
+static ReadOptions CreateReadOptions(bool use_threads, bool use_async) {
+  auto result = csv::ReadOptions::Defaults();
+  result.use_threads = use_threads;
+  result.legacy_blocking_reads = !use_async;
+  // Simulate larger files by using smaller block files so the impact of multiple
+  // blocks is seen but we don't have to spend the time waiting on the large I/O
+  result.block_size = (1 << 20) / 100;
+  return result;
+}
+
+static std::shared_ptr<SlowInputStream> CreateStreamReader(std::shared_ptr<Buffer> buffer,
+                                                           int64_t latency_ms) {
+  auto buffer_reader = std::make_shared<io::BufferReader>(buffer);
+  return std::make_shared<SlowInputStream>(buffer_reader, latency_ms);
+}
+
+static void BenchmarkReader(benchmark::State& state, bool use_threads, bool use_async) {
+  auto latency_ms = state.range(0);
+  auto num_rows = state.range(1);
+  auto num_files = state.range(2);
+  if (num_files > 5 && use_threads && !use_async) {
+    state.SkipWithError("Would deadlock");
+  }
+  auto input_buffer = *MakeSampleCsvBuffer(num_rows);
+  // Hard coding # of threads so we don't deadlock if there are too few cores

Review comment:
       Can you make it a `constexpr something` at least?
   
   Also, why would we deadlock? The implementation should certainly prevent that. The user may be running in a 1- or 2-core VM.

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {

Review comment:
       I'm skeptical about taking `std::function<>` and praying that the compiler optimizes common cases fine, instead of a callable template argument.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -832,9 +832,203 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+TEST(FutureAllTest, Simple) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(2, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(2, *results[1]);
+  }));
+
+  // Finish in reverse order, results should still be delivered in proper order
+  AssertNotFinished(combined);
+  f2.MarkFinished(2);
+  AssertNotFinished(combined);
+  f1.MarkFinished(1);
+  AssertSuccessful(combined);
+}
+
+TEST(FutureAllTest, Failure) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  auto f3 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2, f3};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {

Review comment:
       Same here.

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -832,9 +832,203 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+TEST(FutureAllTest, Simple) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(2, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(2, *results[1]);
+  }));
+
+  // Finish in reverse order, results should still be delivered in proper order
+  AssertNotFinished(combined);
+  f2.MarkFinished(2);
+  AssertNotFinished(combined);
+  f1.MarkFinished(1);
+  AssertSuccessful(combined);
+}
+
+TEST(FutureAllTest, Failure) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  auto f3 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2, f3};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(3, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(Status::IOError("XYZ"), results[1].status());
+    ASSERT_EQ(3, *results[2]);
+  }));
+
+  f1.MarkFinished(1);
+  f2.MarkFinished(Status::IOError("XYZ"));
+  f3.MarkFinished(3);
+
+  AssertFinished(combined);
+}
+
+TEST(FutureLoopTest, Sync) {
+  struct {
+    int i = 0;
+    Future<int> Get() { return Future<int>::MakeFinished(i++); }
+  } IntSource;
+
+  bool do_fail = false;
+  std::vector<int> ints;
+  auto loop_body = [&] {
+    return IntSource.Get().Then([&](int i) -> Result<ControlFlow<int>> {
+      if (do_fail && i == 3) {
+        return Status::IOError("xxx");
+      }
+
+      if (i == 5) {
+        int sum = 0;
+        for (int i : ints) sum += i;
+        return Break(sum);
+      }
+
+      ints.push_back(i);
+      return Continue();
+    });
+  };
+
+  {
+    auto sum_fut = Loop(loop_body);
+    AssertSuccessful(sum_fut);
+
+    ASSERT_OK_AND_ASSIGN(auto sum, sum_fut.result());
+    ASSERT_EQ(sum, 0 + 1 + 2 + 3 + 4);
+  }
+
+  {
+    do_fail = true;
+    IntSource.i = 0;
+    auto sum_fut = Loop(loop_body);
+    AssertFailed(sum_fut);
+    ASSERT_RAISES(IOError, sum_fut.result());
+  }
+}
+
+TEST(FutureLoopTest, EmptyBreakValue) {
+  Future<> none_fut =
+      Loop([&] { return Future<>::MakeFinished().Then([&](...) { return Break(); }); });
+  AssertSuccessful(none_fut);
+}
+
+TEST(FutureLoopTest, MoveOnlyBreakValue) {
+  Future<MoveOnlyDataType> one_fut = Loop([&] {
+    return Future<int>::MakeFinished(1).Then(
+        [&](int i) { return Break(MoveOnlyDataType(i)); });
+  });
+  AssertSuccessful(one_fut);
+  ASSERT_OK_AND_ASSIGN(auto one, std::move(one_fut).result());
+  ASSERT_EQ(one, 1);
+}
+
+TEST(FutureLoopTest, StackOverflow) {
+  // Looping over futures is normally a rather recursive task.  If the futures complete
+  // synchronously (because they are already finished) it could lead to a stack overflow
+  // if care is not taken.
+  int counter = 0;
+  auto loop_body = [&counter]() -> Future<ControlFlow<int>> {
+    while (counter < 1000000) {
+      counter++;
+      return Future<ControlFlow<int>>::MakeFinished(Continue());
+    }
+    return Future<ControlFlow<int>>::MakeFinished(Break(-1));
+  };
+  auto loop_fut = Loop(loop_body);
+  ASSERT_TRUE(loop_fut.Wait(0.1));
+}
+
+TEST(FutureLoopTest, AllowsBreakFutToBeDiscarded) {
+  int counter = 0;
+  auto loop_body = [&counter]() -> Future<ControlFlow<int>> {
+    while (counter < 10) {
+      counter++;
+      return Future<ControlFlow<int>>::MakeFinished(Continue());
+    }
+    return Future<ControlFlow<int>>::MakeFinished(Break(-1));
+  };
+  auto loop_fut = Loop(loop_body).Then([](...) { return Status::OK(); });
+  ASSERT_TRUE(loop_fut.Wait(0.1));
+}
+
+TEST(FutureLoopTest, EmptyLoop) {
+  auto loop_body = []() -> Future<ControlFlow<int>> {
+    return Future<ControlFlow<int>>::MakeFinished(Break(0));
+  };
+  auto loop_fut = Loop(loop_body);
+  ASSERT_TRUE(loop_fut.Wait(0.1));
+  if (loop_fut.is_finished()) {

Review comment:
       Why the `if`?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +615,74 @@ inline std::vector<int> WaitForAny(const std::vector<Future<T>*>& futures,
   return waiter->MoveFinishedFutures();
 }
 
+template <typename T = detail::Empty>
+struct ControlFlow {
+  using BreakValueType = T;
+
+  bool IsBreak() const { return break_value_.has_value(); }
+
+  static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+    return std::move(*cf.break_value_);
+  }
+
+  mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+  template <typename T>
+  operator ControlFlow<T>() && {  // NOLINT explicit
+    return {};
+  }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+  return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,
+          typename Control = typename detail::result_of_t<Iterate()>::ValueType,
+          typename BreakValueType = typename Control::BreakValueType>
+Future<BreakValueType> Loop(Iterate iterate) {
+  auto break_fut = Future<BreakValueType>::Make();
+
+  struct Callback {
+    bool CheckForTermination(const Result<Control>& maybe_control) {
+      if (!maybe_control.ok() || maybe_control->IsBreak()) {
+        Result<BreakValueType> maybe_break = maybe_control.Map(Control::MoveBreakValue);
+        break_fut.MarkFinished(std::move(maybe_break));
+        return true;
+      }
+      return false;
+    }
+
+    void operator()(const Result<Control>& maybe_control) && {
+      if (CheckForTermination(maybe_control)) return;
+
+      auto control_fut = iterate();
+      while (control_fut.is_finished()) {
+        // There's no need to AddCallback on a finished future; we can CheckForTermination
+        // now. This also avoids recursion and potential stack overflow.

Review comment:
       But note the future may finish between here and the `AddCallback()` call below. So while this reduces the risk of stack overflow, it doesn't eliminate it.
   
   Is there a way to avoid such fragility?
   

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -428,10 +547,16 @@ struct ReadaheadIteratorPromise : ReadaheadPromise {
 
   explicit ReadaheadIteratorPromise(Iterator<T>* it) : it_(it) {}
 
-  void Call() override {
+  bool Call() override {
     assert(!called_);
     out_ = it_->Next();
     called_ = true;
+    return out_ == IterationTraits<T>::End();
+  }
+
+  void End() override {
+    // No need to do anything for the synchronous case.  No one is waiting on this
+    // called_ = true;

Review comment:
       Why is this commented out?

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +615,74 @@ inline std::vector<int> WaitForAny(const std::vector<Future<T>*>& futures,
   return waiter->MoveFinishedFutures();
 }
 
+template <typename T = detail::Empty>
+struct ControlFlow {
+  using BreakValueType = T;
+
+  bool IsBreak() const { return break_value_.has_value(); }
+
+  static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+    return std::move(*cf.break_value_);
+  }
+
+  mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+  template <typename T>
+  operator ControlFlow<T>() && {  // NOLINT explicit
+    return {};
+  }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+  return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,
+          typename Control = typename detail::result_of_t<Iterate()>::ValueType,
+          typename BreakValueType = typename Control::BreakValueType>

Review comment:
       Hmm... can't `ControlFlow<T>` simply be a `util::optional<T>`?

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +187,122 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_ = false;
+  bool ready_for_next_ = false;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<Result<TransformFlow<V>>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  // Calls the transform function on the current value.  Can return in several ways
+  // * If the next value is requested (e.g. skip) it will return an empty optional
+  // * If an invalid status is encountered that will be returned
+  // * If finished it will return IterationTraits<V>::End()
+  // * If a value is returned by the transformer that will be returned
+  Result<util::optional<V>> Pump() {

Review comment:
       Is this public?

##########
File path: cpp/src/arrow/util/task_group.h
##########
@@ -63,6 +63,20 @@ class ARROW_EXPORT TaskGroup : public std::enable_shared_from_this<TaskGroup> {
   /// task (or subgroup).
   virtual Status Finish() = 0;
 
+  /// Returns a future that will complete the first time all tasks are finished.
+  /// This should be called only after all top level tasks
+  /// have been added to the task group.
+  ///
+  /// If you are using a TaskGroup asyncrhonously there are a few considerations to keep

Review comment:
       "asynchronously"

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+  auto original = VectorIt({1, 2, 3});
+  auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+  auto original = VectorIt<std::shared_ptr<int>>(
+      {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
+  auto truncated =
+      MakeTransformedIterator(std::move(original), MakeFirstN<std::shared_ptr<int>>(2));
+  ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+  ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+  // Tests the failsafe case where we never call Finish
+  auto original = VectorIt({1});
+  auto truncated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                             MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto background = BackgroundAsyncVectorIt(expected);
+  auto future = CollectAsyncGenerator(background);
+  ASSERT_FALSE(future.is_finished());
+  future.Wait();
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+  Result<TestInt> Next() {
+    if (called_) {
+      return Status::Invalid("Should not have been called twice");
+    }
+    SleepFor(0.1);
+    return IterationTraits<TestInt>::End();
+  }
+
+ private:
+  bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+  // Ensure that the background iterator properly fulfills the asyncgenerator contract
+  // and can be called after it ends.
+  auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+  ASSERT_OK_AND_ASSIGN(
+      auto background_iter,
+      MakeBackgroundIterator(std::move(iterator), internal::GetCpuThreadPool()));
+
+  auto one = background_iter();
+  auto two = background_iter();
+
+  ASSERT_TRUE(one.Wait(0.5));
+
+  if (one.is_finished()) {

Review comment:
       Why `if`?

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +187,122 @@ class Iterator : public util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_ = false;
+  bool ready_for_next_ = false;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<Result<TransformFlow<V>>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  // Calls the transform function on the current value.  Can return in several ways
+  // * If the next value is requested (e.g. skip) it will return an empty optional
+  // * If an invalid status is encountered that will be returned
+  // * If finished it will return IterationTraits<V>::End()
+  // * If a value is returned by the transformer that will be returned
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Result<V> Next() {
+    while (!finished_) {
+      ARROW_ASSIGN_OR_RAISE(util::optional<V> next, Pump());
+      if (next.has_value()) {
+        return *next;
+      }
+      ARROW_ASSIGN_OR_RAISE(last_value_, it_.Next());
+    }
+    return IterationTraits<V>::End();
+  }
+
+ private:
+  Iterator<T> it_;
+  Transformer<T, V> transformer_;
+  util::optional<T> last_value_;
+  bool finished_ = false;
+};
+
+/// \brief Transforms an iterator according to a transformer, returning a new Iterator.
+///
+/// The transformer will be called on each element of the source iterator and for each
+/// call it can yield a value, skip, or finish the iteration.  When yielding a value the
+/// transformer can choose to consume the source item (the default, ready_for_next = true)
+/// or to keep it and it will be called again on the same value.

Review comment:
       Why called again on the same value? Is there some non-toy use case?

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+  auto original = VectorIt({1, 2, 3});
+  auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+  auto original = VectorIt<std::shared_ptr<int>>(
+      {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
+  auto truncated =
+      MakeTransformedIterator(std::move(original), MakeFirstN<std::shared_ptr<int>>(2));
+  ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+  ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+  // Tests the failsafe case where we never call Finish
+  auto original = VectorIt({1});
+  auto truncated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                             MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto background = BackgroundAsyncVectorIt(expected);
+  auto future = CollectAsyncGenerator(background);
+  ASSERT_FALSE(future.is_finished());
+  future.Wait();
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+  Result<TestInt> Next() {
+    if (called_) {
+      return Status::Invalid("Should not have been called twice");
+    }
+    SleepFor(0.1);
+    return IterationTraits<TestInt>::End();
+  }
+
+ private:
+  bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+  // Ensure that the background iterator properly fulfills the asyncgenerator contract
+  // and can be called after it ends.
+  auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+  ASSERT_OK_AND_ASSIGN(
+      auto background_iter,
+      MakeBackgroundIterator(std::move(iterator), internal::GetCpuThreadPool()));
+
+  auto one = background_iter();
+  auto two = background_iter();
+
+  ASSERT_TRUE(one.Wait(0.5));
+
+  if (one.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+  }
+
+  ASSERT_TRUE(two.Wait(0.5));
+  ASSERT_TRUE(two.is_finished());
+  if (two.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+  }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+  AsyncGenerator<TestInt> generator = []() {
+    return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+  };
+  Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, skip_all);
+  auto future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+  ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+  auto expected = RangeVector(100);
+  std::vector<Future<std::vector<TestInt>>> futures;
+  for (unsigned int i = 0; i < 100; i++) {
+    auto background = BackgroundAsyncVectorIt(expected);
+    futures.push_back(CollectAsyncGenerator(background));
+  }
+  auto combined = All(futures);
+  combined.Wait(2);
+  if (combined.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+    for (auto&& vector : completed_vectors) {
+      ASSERT_EQ(vector, expected);
+    }
+  } else {
+    FAIL() << "After 2 seconds all background iterators had not finished collecting";
+  }
+}
+
+TEST(TestAsyncUtil, StackOverflow) {
+  int counter = 0;
+  AsyncGenerator<TestInt> generator = [&counter]() {
+    if (counter < 1000000) {
+      return Future<TestInt>::MakeFinished(counter++);
+    } else {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+  };
+  Transformer<TestInt, TestInt> discard =
+      [](TestInt next) -> Result<TransformFlow<TestInt>> { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, discard);
+  auto collected_future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(collected_future.Wait(5));
+  if (collected_future.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto collected, collected_future.result());
+    ASSERT_EQ(0, collected.size());
+  }
+}
+
+TEST(TestAsyncUtil, Visit) {
+  auto generator = AsyncVectorIt({1, 2, 3});
+  unsigned int sum = 0;
+  auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt item) {
+    sum += item.value;
+    return Status::OK();
+  });
+  // Should be superfluous
+  sum_future.Wait();
+  ASSERT_EQ(6, sum);
+}
+
+TEST(TestAsyncUtil, Collect) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto generator = AsyncVectorIt(expected);
+  auto collected = CollectAsyncGenerator(generator);
+  ASSERT_EQ(expected, *collected.result());
+}
+
+template <typename T>
+Transformer<T, T> MakeRepeatN(int repeat_count) {
+  int current_repeat = 0;
+  return [repeat_count, current_repeat](T next) mutable -> Result<TransformFlow<T>> {
+    current_repeat++;
+    bool ready_for_next = false;
+    if (current_repeat == repeat_count) {
+      current_repeat = 0;
+      ready_for_next = true;
+    }
+    return TransformYield(next, ready_for_next);
+  };
+}
+
+TEST(TestIteratorTransform, Repeating) {

Review comment:
       Can you take care to group the tests semantically?
   I would expect all `TestIteratorTransform`s to be grouped together.
   Also it would be nice to have async iterator tests grouped separately from regular iterator tests.
   Also, basic primitives (e.g. `CollectAsyncGenerator`) should generally be tested before more complex ones.
   
   Without any care for test ordering, test files become write-only.

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+  auto original = VectorIt({1, 2, 3});
+  auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+  auto original = VectorIt<std::shared_ptr<int>>(
+      {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
+  auto truncated =
+      MakeTransformedIterator(std::move(original), MakeFirstN<std::shared_ptr<int>>(2));
+  ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+  ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+  // Tests the failsafe case where we never call Finish
+  auto original = VectorIt({1});
+  auto truncated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                             MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto background = BackgroundAsyncVectorIt(expected);
+  auto future = CollectAsyncGenerator(background);
+  ASSERT_FALSE(future.is_finished());
+  future.Wait();
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+  Result<TestInt> Next() {
+    if (called_) {
+      return Status::Invalid("Should not have been called twice");
+    }
+    SleepFor(0.1);
+    return IterationTraits<TestInt>::End();
+  }
+
+ private:
+  bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+  // Ensure that the background iterator properly fulfills the asyncgenerator contract
+  // and can be called after it ends.
+  auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+  ASSERT_OK_AND_ASSIGN(
+      auto background_iter,
+      MakeBackgroundIterator(std::move(iterator), internal::GetCpuThreadPool()));
+
+  auto one = background_iter();
+  auto two = background_iter();
+
+  ASSERT_TRUE(one.Wait(0.5));
+
+  if (one.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+  }
+
+  ASSERT_TRUE(two.Wait(0.5));
+  ASSERT_TRUE(two.is_finished());
+  if (two.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+  }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+  AsyncGenerator<TestInt> generator = []() {
+    return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+  };
+  Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, skip_all);
+  auto future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+  ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+  auto expected = RangeVector(100);
+  std::vector<Future<std::vector<TestInt>>> futures;
+  for (unsigned int i = 0; i < 100; i++) {
+    auto background = BackgroundAsyncVectorIt(expected);
+    futures.push_back(CollectAsyncGenerator(background));
+  }
+  auto combined = All(futures);
+  combined.Wait(2);
+  if (combined.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+    for (auto&& vector : completed_vectors) {
+      ASSERT_EQ(vector, expected);

Review comment:
       Hmm... unless I'm mistaken, `vector` should be a `Result<std::vector<TestInt>>`? How can it be compared with a `std::vector<TestInt>`?

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+  auto original = VectorIt({1, 2, 3});
+  auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+  auto original = VectorIt<std::shared_ptr<int>>(
+      {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
+  auto truncated =
+      MakeTransformedIterator(std::move(original), MakeFirstN<std::shared_ptr<int>>(2));
+  ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+  ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+  // Tests the failsafe case where we never call Finish
+  auto original = VectorIt({1});
+  auto truncated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                             MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto background = BackgroundAsyncVectorIt(expected);
+  auto future = CollectAsyncGenerator(background);
+  ASSERT_FALSE(future.is_finished());
+  future.Wait();
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+  Result<TestInt> Next() {
+    if (called_) {
+      return Status::Invalid("Should not have been called twice");
+    }
+    SleepFor(0.1);
+    return IterationTraits<TestInt>::End();
+  }
+
+ private:
+  bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+  // Ensure that the background iterator properly fulfills the asyncgenerator contract
+  // and can be called after it ends.
+  auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+  ASSERT_OK_AND_ASSIGN(
+      auto background_iter,
+      MakeBackgroundIterator(std::move(iterator), internal::GetCpuThreadPool()));
+
+  auto one = background_iter();
+  auto two = background_iter();
+
+  ASSERT_TRUE(one.Wait(0.5));
+
+  if (one.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+  }
+
+  ASSERT_TRUE(two.Wait(0.5));
+  ASSERT_TRUE(two.is_finished());
+  if (two.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+  }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+  AsyncGenerator<TestInt> generator = []() {
+    return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+  };
+  Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, skip_all);
+  auto future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+  ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+  auto expected = RangeVector(100);
+  std::vector<Future<std::vector<TestInt>>> futures;
+  for (unsigned int i = 0; i < 100; i++) {
+    auto background = BackgroundAsyncVectorIt(expected);
+    futures.push_back(CollectAsyncGenerator(background));
+  }
+  auto combined = All(futures);
+  combined.Wait(2);
+  if (combined.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+    for (auto&& vector : completed_vectors) {
+      ASSERT_EQ(vector, expected);
+    }
+  } else {
+    FAIL() << "After 2 seconds all background iterators had not finished collecting";
+  }
+}
+
+TEST(TestAsyncUtil, StackOverflow) {
+  int counter = 0;
+  AsyncGenerator<TestInt> generator = [&counter]() {
+    if (counter < 1000000) {
+      return Future<TestInt>::MakeFinished(counter++);
+    } else {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+  };
+  Transformer<TestInt, TestInt> discard =
+      [](TestInt next) -> Result<TransformFlow<TestInt>> { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, discard);
+  auto collected_future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(collected_future.Wait(5));
+  if (collected_future.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto collected, collected_future.result());
+    ASSERT_EQ(0, collected.size());
+  }
+}
+
+TEST(TestAsyncUtil, Visit) {
+  auto generator = AsyncVectorIt({1, 2, 3});
+  unsigned int sum = 0;
+  auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt item) {
+    sum += item.value;
+    return Status::OK();
+  });
+  // Should be superfluous
+  sum_future.Wait();
+  ASSERT_EQ(6, sum);
+}
+
+TEST(TestAsyncUtil, Collect) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto generator = AsyncVectorIt(expected);
+  auto collected = CollectAsyncGenerator(generator);
+  ASSERT_EQ(expected, *collected.result());
+}
+
+template <typename T>
+Transformer<T, T> MakeRepeatN(int repeat_count) {
+  int current_repeat = 0;
+  return [repeat_count, current_repeat](T next) mutable -> Result<TransformFlow<T>> {
+    current_repeat++;
+    bool ready_for_next = false;
+    if (current_repeat == repeat_count) {
+      current_repeat = 0;
+      ready_for_next = true;
+    }
+    return TransformYield(next, ready_for_next);
+  };
+}
+
+TEST(TestIteratorTransform, Repeating) {
+  auto original = VectorIt({1, 2, 3});
+  auto repeated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                            MakeRepeatN<TestInt>(2));
+  AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated));
+}
+
+template <typename T>
+Transformer<T, T> MakeFilter(std::function<bool(T&)> filter) {
+  return [filter](T next) -> Result<TransformFlow<T>> {
+    if (filter(next)) {
+      return TransformYield(next);
+    } else {
+      return TransformSkip();
+    }
+  };
+}
+
+template <typename T>
+Transformer<T, T> MakeAbortOnSecond() {
+  int counter = 0;
+  return [counter](T next) mutable -> Result<TransformFlow<T>> {
+    if (counter++ == 1) {
+      return Status::Invalid("X");
+    }
+    return TransformYield(next);
+  };
+}
+
+TEST(TestIteratorTransform, SkipSome) {
+  // Exercises TransformSkip
+  auto original = VectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return t.value != 2; });
+  auto filtered = MakeTransformedIterator(std::move(original), filter);
+  AssertIteratorMatch({1, 3}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, SkipAll) {
+  // Exercises TransformSkip
+  auto original = VectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return false; });
+  auto filtered = MakeTransformedIterator(std::move(original), filter);
+  AssertIteratorMatch({}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, Abort) {
+  auto original = VectorIt({1, 2, 3});
+  auto transformed =
+      MakeTransformedIterator(std::move(original), MakeAbortOnSecond<TestInt>());
+  ASSERT_OK(transformed.Next());
+  ASSERT_RAISES(Invalid, transformed.Next());
+}
+
+TEST(TestAsyncIteratorTransform, SkipSome) {
+  auto original = AsyncVectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return t.value != 2; });
+  auto filtered = TransformAsyncGenerator(std::move(original), filter);
+  AssertAsyncGeneratorMatch({1, 3}, std::move(filtered));
+}
+
+TEST(TestAsyncUtil, ReadaheadFailed) {
+  auto source = []() -> Future<TestInt> {
+    return Future<TestInt>::MakeFinished(Status::Invalid("X"));
+  };
+  auto readahead = AddReadahead<TestInt>(source, 10);
+  auto next = readahead();
+  ASSERT_EQ(Status::Invalid("X"), next.status());

Review comment:
       Should the next ones also fail? With the same error / different one?

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+  auto original = VectorIt({1, 2, 3});
+  auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+  auto original = VectorIt<std::shared_ptr<int>>(
+      {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
+  auto truncated =
+      MakeTransformedIterator(std::move(original), MakeFirstN<std::shared_ptr<int>>(2));
+  ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+  ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+  // Tests the failsafe case where we never call Finish
+  auto original = VectorIt({1});
+  auto truncated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                             MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto background = BackgroundAsyncVectorIt(expected);
+  auto future = CollectAsyncGenerator(background);
+  ASSERT_FALSE(future.is_finished());
+  future.Wait();
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+  Result<TestInt> Next() {
+    if (called_) {
+      return Status::Invalid("Should not have been called twice");
+    }
+    SleepFor(0.1);
+    return IterationTraits<TestInt>::End();
+  }
+
+ private:
+  bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+  // Ensure that the background iterator properly fulfills the asyncgenerator contract
+  // and can be called after it ends.
+  auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+  ASSERT_OK_AND_ASSIGN(
+      auto background_iter,
+      MakeBackgroundIterator(std::move(iterator), internal::GetCpuThreadPool()));
+
+  auto one = background_iter();
+  auto two = background_iter();
+
+  ASSERT_TRUE(one.Wait(0.5));
+
+  if (one.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+  }
+
+  ASSERT_TRUE(two.Wait(0.5));
+  ASSERT_TRUE(two.is_finished());
+  if (two.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+  }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+  AsyncGenerator<TestInt> generator = []() {
+    return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+  };
+  Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, skip_all);
+  auto future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+  ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+  auto expected = RangeVector(100);
+  std::vector<Future<std::vector<TestInt>>> futures;
+  for (unsigned int i = 0; i < 100; i++) {
+    auto background = BackgroundAsyncVectorIt(expected);
+    futures.push_back(CollectAsyncGenerator(background));
+  }
+  auto combined = All(futures);
+  combined.Wait(2);
+  if (combined.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+    for (auto&& vector : completed_vectors) {
+      ASSERT_EQ(vector, expected);
+    }
+  } else {
+    FAIL() << "After 2 seconds all background iterators had not finished collecting";
+  }
+}
+
+TEST(TestAsyncUtil, StackOverflow) {
+  int counter = 0;
+  AsyncGenerator<TestInt> generator = [&counter]() {
+    if (counter < 1000000) {
+      return Future<TestInt>::MakeFinished(counter++);
+    } else {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+  };
+  Transformer<TestInt, TestInt> discard =
+      [](TestInt next) -> Result<TransformFlow<TestInt>> { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, discard);
+  auto collected_future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(collected_future.Wait(5));
+  if (collected_future.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto collected, collected_future.result());
+    ASSERT_EQ(0, collected.size());
+  }
+}
+
+TEST(TestAsyncUtil, Visit) {
+  auto generator = AsyncVectorIt({1, 2, 3});
+  unsigned int sum = 0;
+  auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt item) {
+    sum += item.value;
+    return Status::OK();
+  });
+  // Should be superfluous
+  sum_future.Wait();
+  ASSERT_EQ(6, sum);
+}
+
+TEST(TestAsyncUtil, Collect) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto generator = AsyncVectorIt(expected);
+  auto collected = CollectAsyncGenerator(generator);
+  ASSERT_EQ(expected, *collected.result());
+}
+
+template <typename T>
+Transformer<T, T> MakeRepeatN(int repeat_count) {
+  int current_repeat = 0;
+  return [repeat_count, current_repeat](T next) mutable -> Result<TransformFlow<T>> {
+    current_repeat++;
+    bool ready_for_next = false;
+    if (current_repeat == repeat_count) {
+      current_repeat = 0;
+      ready_for_next = true;
+    }
+    return TransformYield(next, ready_for_next);
+  };
+}
+
+TEST(TestIteratorTransform, Repeating) {
+  auto original = VectorIt({1, 2, 3});
+  auto repeated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                            MakeRepeatN<TestInt>(2));
+  AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated));
+}
+
+template <typename T>
+Transformer<T, T> MakeFilter(std::function<bool(T&)> filter) {
+  return [filter](T next) -> Result<TransformFlow<T>> {
+    if (filter(next)) {
+      return TransformYield(next);
+    } else {
+      return TransformSkip();
+    }
+  };
+}
+
+template <typename T>
+Transformer<T, T> MakeAbortOnSecond() {
+  int counter = 0;
+  return [counter](T next) mutable -> Result<TransformFlow<T>> {
+    if (counter++ == 1) {
+      return Status::Invalid("X");
+    }
+    return TransformYield(next);
+  };
+}
+
+TEST(TestIteratorTransform, SkipSome) {
+  // Exercises TransformSkip
+  auto original = VectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return t.value != 2; });
+  auto filtered = MakeTransformedIterator(std::move(original), filter);
+  AssertIteratorMatch({1, 3}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, SkipAll) {
+  // Exercises TransformSkip
+  auto original = VectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return false; });
+  auto filtered = MakeTransformedIterator(std::move(original), filter);
+  AssertIteratorMatch({}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, Abort) {
+  auto original = VectorIt({1, 2, 3});
+  auto transformed =
+      MakeTransformedIterator(std::move(original), MakeAbortOnSecond<TestInt>());
+  ASSERT_OK(transformed.Next());
+  ASSERT_RAISES(Invalid, transformed.Next());
+}

Review comment:
       Should the third one still succeed?

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -87,6 +87,21 @@ class ARROW_EXPORT Executor {
     return SpawnReal(hints, std::forward<Function>(func));
   }
 
+  template <typename T>
+  Future<T> Transfer(Future<T> future) {

Review comment:
       Hmm... so the only thing that will be spawed on the executor (presumably a thread pool) is a `MarkFinished` called?
   I suppose the intent is so that `Then()` calls are also made on the executor?
   
   In any case, please add a comment or docstring describing this and the intent.

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+  auto original = VectorIt({1, 2, 3});
+  auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+  auto original = VectorIt<std::shared_ptr<int>>(
+      {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
+  auto truncated =
+      MakeTransformedIterator(std::move(original), MakeFirstN<std::shared_ptr<int>>(2));
+  ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+  ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+  // Tests the failsafe case where we never call Finish
+  auto original = VectorIt({1});
+  auto truncated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                             MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto background = BackgroundAsyncVectorIt(expected);
+  auto future = CollectAsyncGenerator(background);
+  ASSERT_FALSE(future.is_finished());
+  future.Wait();
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+  Result<TestInt> Next() {
+    if (called_) {
+      return Status::Invalid("Should not have been called twice");
+    }
+    SleepFor(0.1);
+    return IterationTraits<TestInt>::End();
+  }
+
+ private:
+  bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+  // Ensure that the background iterator properly fulfills the asyncgenerator contract
+  // and can be called after it ends.
+  auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+  ASSERT_OK_AND_ASSIGN(
+      auto background_iter,
+      MakeBackgroundIterator(std::move(iterator), internal::GetCpuThreadPool()));
+
+  auto one = background_iter();
+  auto two = background_iter();
+
+  ASSERT_TRUE(one.Wait(0.5));
+
+  if (one.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+  }
+
+  ASSERT_TRUE(two.Wait(0.5));
+  ASSERT_TRUE(two.is_finished());
+  if (two.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+  }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+  AsyncGenerator<TestInt> generator = []() {
+    return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+  };
+  Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, skip_all);
+  auto future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+  ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+  auto expected = RangeVector(100);
+  std::vector<Future<std::vector<TestInt>>> futures;
+  for (unsigned int i = 0; i < 100; i++) {
+    auto background = BackgroundAsyncVectorIt(expected);
+    futures.push_back(CollectAsyncGenerator(background));
+  }
+  auto combined = All(futures);
+  combined.Wait(2);
+  if (combined.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+    for (auto&& vector : completed_vectors) {
+      ASSERT_EQ(vector, expected);
+    }
+  } else {
+    FAIL() << "After 2 seconds all background iterators had not finished collecting";
+  }
+}
+
+TEST(TestAsyncUtil, StackOverflow) {
+  int counter = 0;
+  AsyncGenerator<TestInt> generator = [&counter]() {
+    if (counter < 1000000) {
+      return Future<TestInt>::MakeFinished(counter++);
+    } else {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+  };
+  Transformer<TestInt, TestInt> discard =
+      [](TestInt next) -> Result<TransformFlow<TestInt>> { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, discard);
+  auto collected_future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(collected_future.Wait(5));
+  if (collected_future.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto collected, collected_future.result());
+    ASSERT_EQ(0, collected.size());
+  }
+}
+
+TEST(TestAsyncUtil, Visit) {
+  auto generator = AsyncVectorIt({1, 2, 3});
+  unsigned int sum = 0;
+  auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt item) {
+    sum += item.value;
+    return Status::OK();
+  });
+  // Should be superfluous

Review comment:
       Then replace it with `ASSERT_TRUE(sum_future.finished())`?

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -87,6 +87,21 @@ class ARROW_EXPORT Executor {
     return SpawnReal(hints, std::forward<Function>(func));
   }
 
+  template <typename T>
+  Future<T> Transfer(Future<T> future) {
+    auto transferred = Future<T>::Make();
+    future.AddCallback([this, transferred](const Result<T>& result) mutable {
+      Result<T> result_copy(result);
+      auto spawn_status = Spawn([transferred, result_copy]() mutable {
+        transferred.MarkFinished(result_copy);

Review comment:
       and this is a third one?

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -135,6 +149,18 @@ class ThreadedTaskGroup : public TaskGroup {
       // before cv.notify_one() has returned
       std::unique_lock<std::mutex> lock(mutex_);
       cv_.notify_one();
+      if (completion_future_.has_value()) {
+        // MarkFinished could be slow.  We don't want to call it while we are holding
+        // the lock.
+        // TODO: If optional is thread safe then we can skip this locking entirely
+        auto future = *completion_future_;
+        auto finished = completion_future_->is_finished();
+        auto status = status_;

Review comment:
       `const auto&`?

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -87,6 +87,21 @@ class ARROW_EXPORT Executor {
     return SpawnReal(hints, std::forward<Function>(func));
   }
 
+  template <typename T>
+  Future<T> Transfer(Future<T> future) {
+    auto transferred = Future<T>::Make();
+    future.AddCallback([this, transferred](const Result<T>& result) mutable {
+      Result<T> result_copy(result);
+      auto spawn_status = Spawn([transferred, result_copy]() mutable {

Review comment:
       This is a second copy.

##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -135,6 +149,18 @@ class ThreadedTaskGroup : public TaskGroup {
       // before cv.notify_one() has returned
       std::unique_lock<std::mutex> lock(mutex_);
       cv_.notify_one();
+      if (completion_future_.has_value()) {
+        // MarkFinished could be slow.  We don't want to call it while we are holding
+        // the lock.
+        // TODO: If optional is thread safe then we can skip this locking entirely

Review comment:
       Well, the locking is necessary anyway, so I'm not sure the TODO is very useful.

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "arrow/util/iterator.h"
+#include "arrow/util/async_iterator.h"

Review comment:
       Nit: keep includes sorted in lexicographic order.

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);
+      }
+
+      auto next_fut = generator_();
+      // If finished already, process results immediately inside the loop to avoid stack
+      // overflow
+      if (next_fut.is_finished()) {
+        auto next_result = next_fut.result();
+        if (next_result.ok()) {
+          last_value_ = *next_result;
+        } else {
+          return Future<V>::MakeFinished(next_result.status());
+        }
+        // Otherwise, if not finished immediately, add callback to process results
+      } else {
+        return next_fut.Then([this](const Result<T>& next_result) {
+          if (next_result.ok()) {
+            last_value_ = *next_result;
+            return (*this)();
+          } else {
+            return Future<V>::MakeFinished(next_result.status());
+          }
+        });
+      }
+    }
+  }
+
+ protected:
+  bool finished_;
+  util::optional<T> last_value_;
+  AsyncGenerator<T> generator_;
+  Transformer<T, V> transformer_;
+};
+
+template <typename T>
+static std::function<void(const Result<T>&)> MakeCallback(

Review comment:
       This seems unused?

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);
+      }
+
+      auto next_fut = generator_();
+      // If finished already, process results immediately inside the loop to avoid stack
+      // overflow
+      if (next_fut.is_finished()) {
+        auto next_result = next_fut.result();
+        if (next_result.ok()) {
+          last_value_ = *next_result;

Review comment:
       `*std::move(next_result)`

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}

Review comment:
       `std::move(generator)` and `std::move(transformer)`

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();

Review comment:
       `std::move(maybe_next_result).ValueUnsafe()`?

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -87,6 +87,21 @@ class ARROW_EXPORT Executor {
     return SpawnReal(hints, std::forward<Function>(func));
   }
 
+  template <typename T>
+  Future<T> Transfer(Future<T> future) {
+    auto transferred = Future<T>::Make();
+    future.AddCallback([this, transferred](const Result<T>& result) mutable {
+      Result<T> result_copy(result);

Review comment:
       This is a first copy.

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);
+      }
+
+      auto next_fut = generator_();
+      // If finished already, process results immediately inside the loop to avoid stack
+      // overflow
+      if (next_fut.is_finished()) {
+        auto next_result = next_fut.result();
+        if (next_result.ok()) {
+          last_value_ = *next_result;
+        } else {
+          return Future<V>::MakeFinished(next_result.status());
+        }
+        // Otherwise, if not finished immediately, add callback to process results
+      } else {
+        return next_fut.Then([this](const Result<T>& next_result) {
+          if (next_result.ok()) {
+            last_value_ = *next_result;
+            return (*this)();
+          } else {
+            return Future<V>::MakeFinished(next_result.status());
+          }
+        });
+      }
+    }
+  }
+
+ protected:
+  bool finished_;
+  util::optional<T> last_value_;
+  AsyncGenerator<T> generator_;
+  Transformer<T, V> transformer_;
+};
+
+template <typename T>
+static std::function<void(const Result<T>&)> MakeCallback(
+    std::shared_ptr<bool> finished) {
+  return [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+}
+
+template <typename T>
+AsyncGenerator<T> AddReadahead(AsyncGenerator<T> source_generator, int max_readahead) {
+  // Using a shared_ptr instead of a lambda capture here because it's possible that
+  // the inner mark_finished_if_done outlives the outer lambda
+  auto finished = std::make_shared<bool>(false);
+  auto mark_finished_if_done = [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+
+  std::queue<Future<T>> readahead_queue;
+  return [=]() mutable -> Future<T> {
+    if (readahead_queue.empty()) {
+      // This is the first request, let's pump the underlying queue

Review comment:
       Why not pump as soon as `AddReadahead` is called? It would be more consistent with `ReadaheadIterator`.

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {

Review comment:
       Will make a copy of `generator`.

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();

Review comment:
       Instead of this, use an anonymous callable object?

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);

Review comment:
       `*std::move(maybe_next)`?

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);
+      }
+
+      auto next_fut = generator_();
+      // If finished already, process results immediately inside the loop to avoid stack
+      // overflow
+      if (next_fut.is_finished()) {
+        auto next_result = next_fut.result();
+        if (next_result.ok()) {
+          last_value_ = *next_result;
+        } else {
+          return Future<V>::MakeFinished(next_result.status());
+        }
+        // Otherwise, if not finished immediately, add callback to process results
+      } else {
+        return next_fut.Then([this](const Result<T>& next_result) {
+          if (next_result.ok()) {
+            last_value_ = *next_result;
+            return (*this)();
+          } else {
+            return Future<V>::MakeFinished(next_result.status());
+          }
+        });
+      }
+    }
+  }
+
+ protected:
+  bool finished_;
+  util::optional<T> last_value_;
+  AsyncGenerator<T> generator_;
+  Transformer<T, V> transformer_;
+};
+
+template <typename T>
+static std::function<void(const Result<T>&)> MakeCallback(
+    std::shared_ptr<bool> finished) {
+  return [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+}
+
+template <typename T>
+AsyncGenerator<T> AddReadahead(AsyncGenerator<T> source_generator, int max_readahead) {
+  // Using a shared_ptr instead of a lambda capture here because it's possible that
+  // the inner mark_finished_if_done outlives the outer lambda
+  auto finished = std::make_shared<bool>(false);
+  auto mark_finished_if_done = [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;

Review comment:
       `const auto&`

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);
+      }
+
+      auto next_fut = generator_();
+      // If finished already, process results immediately inside the loop to avoid stack
+      // overflow
+      if (next_fut.is_finished()) {
+        auto next_result = next_fut.result();
+        if (next_result.ok()) {
+          last_value_ = *next_result;
+        } else {
+          return Future<V>::MakeFinished(next_result.status());
+        }
+        // Otherwise, if not finished immediately, add callback to process results
+      } else {
+        return next_fut.Then([this](const Result<T>& next_result) {
+          if (next_result.ok()) {
+            last_value_ = *next_result;
+            return (*this)();
+          } else {
+            return Future<V>::MakeFinished(next_result.status());
+          }
+        });
+      }
+    }
+  }
+
+ protected:
+  bool finished_;
+  util::optional<T> last_value_;
+  AsyncGenerator<T> generator_;
+  Transformer<T, V> transformer_;
+};
+
+template <typename T>
+static std::function<void(const Result<T>&)> MakeCallback(
+    std::shared_ptr<bool> finished) {
+  return [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+}
+
+template <typename T>
+AsyncGenerator<T> AddReadahead(AsyncGenerator<T> source_generator, int max_readahead) {
+  // Using a shared_ptr instead of a lambda capture here because it's possible that
+  // the inner mark_finished_if_done outlives the outer lambda
+  auto finished = std::make_shared<bool>(false);
+  auto mark_finished_if_done = [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+
+  std::queue<Future<T>> readahead_queue;
+  return [=]() mutable -> Future<T> {
+    if (readahead_queue.empty()) {
+      // This is the first request, let's pump the underlying queue
+      for (int i = 0; i < max_readahead; i++) {
+        auto next = source_generator();
+        next.AddCallback(mark_finished_if_done);
+        readahead_queue.push(std::move(next));
+      }
+    }
+    // Pop one and add one
+    auto result = readahead_queue.front();
+    readahead_queue.pop();
+    if (*finished) {
+      readahead_queue.push(Future<T>::MakeFinished(IterationTraits<T>::End()));
+    } else {
+      auto back_of_queue = source_generator();
+      back_of_queue.AddCallback(mark_finished_if_done);
+      readahead_queue.push(std::move(back_of_queue));
+    }
+    return result;
+  };
+}
+
+/// \brief Transforms an async generator using a transformer function returning a new
+/// AsyncGenerator
+///
+/// The transform function here behaves exactly the same as the transform function in
+/// MakeTransformedIterator and you can safely use the same transform function to
+/// transform both synchronous and asynchronous streams.
+template <typename T, typename V>
+AsyncGenerator<V> TransformAsyncGenerator(AsyncGenerator<T> generator,
+                                          Transformer<T, V> transformer) {
+  return TransformingGenerator<T, V>(generator, transformer);
+}
+
+namespace detail {
+
+template <typename T>
+struct BackgroundIteratorPromise : ReadaheadPromise {
+  ~BackgroundIteratorPromise() override {}
+
+  explicit BackgroundIteratorPromise(Iterator<T>* it) : it_(it) {}
+
+  bool Call() override {
+    auto next = it_->Next();
+    auto finished = next == IterationTraits<T>::End();
+    out_.MarkFinished(std::move(next));
+    return finished;
+  }
+
+  void End() override { out_.MarkFinished(IterationTraits<T>::End()); }
+
+  Iterator<T>* it_;
+  Future<T> out_ = Future<T>::Make();
+};
+
+}  // namespace detail
+
+/// \brief Async generator that iterates on an underlying iterator in a
+/// separate thread.
+template <typename T>
+class BackgroundIterator {

Review comment:
       It's a bit confusing that it's named `BackgroundIterator` even though it's not an iterator... should it be `BackgroundGenerator`?

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);
+      }
+
+      auto next_fut = generator_();
+      // If finished already, process results immediately inside the loop to avoid stack
+      // overflow
+      if (next_fut.is_finished()) {
+        auto next_result = next_fut.result();
+        if (next_result.ok()) {
+          last_value_ = *next_result;
+        } else {
+          return Future<V>::MakeFinished(next_result.status());
+        }
+        // Otherwise, if not finished immediately, add callback to process results
+      } else {
+        return next_fut.Then([this](const Result<T>& next_result) {
+          if (next_result.ok()) {
+            last_value_ = *next_result;
+            return (*this)();
+          } else {
+            return Future<V>::MakeFinished(next_result.status());
+          }
+        });
+      }
+    }
+  }
+
+ protected:
+  bool finished_;
+  util::optional<T> last_value_;
+  AsyncGenerator<T> generator_;
+  Transformer<T, V> transformer_;
+};
+
+template <typename T>
+static std::function<void(const Result<T>&)> MakeCallback(
+    std::shared_ptr<bool> finished) {
+  return [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+}
+
+template <typename T>
+AsyncGenerator<T> AddReadahead(AsyncGenerator<T> source_generator, int max_readahead) {
+  // Using a shared_ptr instead of a lambda capture here because it's possible that
+  // the inner mark_finished_if_done outlives the outer lambda
+  auto finished = std::make_shared<bool>(false);
+  auto mark_finished_if_done = [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+
+  std::queue<Future<T>> readahead_queue;
+  return [=]() mutable -> Future<T> {
+    if (readahead_queue.empty()) {
+      // This is the first request, let's pump the underlying queue
+      for (int i = 0; i < max_readahead; i++) {
+        auto next = source_generator();
+        next.AddCallback(mark_finished_if_done);
+        readahead_queue.push(std::move(next));
+      }
+    }
+    // Pop one and add one
+    auto result = readahead_queue.front();
+    readahead_queue.pop();
+    if (*finished) {
+      readahead_queue.push(Future<T>::MakeFinished(IterationTraits<T>::End()));
+    } else {
+      auto back_of_queue = source_generator();
+      back_of_queue.AddCallback(mark_finished_if_done);
+      readahead_queue.push(std::move(back_of_queue));
+    }
+    return result;
+  };
+}
+
+/// \brief Transforms an async generator using a transformer function returning a new
+/// AsyncGenerator
+///
+/// The transform function here behaves exactly the same as the transform function in
+/// MakeTransformedIterator and you can safely use the same transform function to
+/// transform both synchronous and asynchronous streams.
+template <typename T, typename V>
+AsyncGenerator<V> TransformAsyncGenerator(AsyncGenerator<T> generator,
+                                          Transformer<T, V> transformer) {
+  return TransformingGenerator<T, V>(generator, transformer);
+}
+
+namespace detail {
+
+template <typename T>
+struct BackgroundIteratorPromise : ReadaheadPromise {
+  ~BackgroundIteratorPromise() override {}
+
+  explicit BackgroundIteratorPromise(Iterator<T>* it) : it_(it) {}
+
+  bool Call() override {
+    auto next = it_->Next();
+    auto finished = next == IterationTraits<T>::End();
+    out_.MarkFinished(std::move(next));
+    return finished;
+  }
+
+  void End() override { out_.MarkFinished(IterationTraits<T>::End()); }
+
+  Iterator<T>* it_;
+  Future<T> out_ = Future<T>::Make();
+};
+
+}  // namespace detail
+
+/// \brief Async generator that iterates on an underlying iterator in a
+/// separate thread.
+template <typename T>
+class BackgroundIterator {
+  using PromiseType = typename detail::BackgroundIteratorPromise<T>;
+
+ public:
+  explicit BackgroundIterator(Iterator<T> it, internal::Executor* executor)
+      : it_(new Iterator<T>(std::move(it))),
+        queue_(new detail::ReadaheadQueue(0)),
+        executor_(executor),
+        done_() {}
+
+  ~BackgroundIterator() {
+    if (queue_) {
+      // Make sure the queue doesn't call any promises after this object
+      // is destroyed.
+      queue_->EnsureShutdownOrDie();
+    }
+  }
+
+  ARROW_DEFAULT_MOVE_AND_ASSIGN(BackgroundIterator);
+  ARROW_DISALLOW_COPY_AND_ASSIGN(BackgroundIterator);
+
+  Future<T> operator()() {
+    if (done_) {
+      return Future<T>::MakeFinished(IterationTraits<T>::End());
+    }
+    auto promise = std::unique_ptr<PromiseType>(new PromiseType{it_.get()});
+    auto result = Future<T>(promise->out_);
+    // TODO: Need a futuristic version of ARROW_RETURN_NOT_OK
+    auto append_status = queue_->Append(
+        static_cast<std::unique_ptr<detail::ReadaheadPromise>>(std::move(promise)));
+    if (!append_status.ok()) {
+      return Future<T>::MakeFinished(append_status);
+    }
+
+    result.AddCallback([this](const Result<T>& result) {
+      if (!result.ok() || result.ValueUnsafe() == IterationTraits<T>::End()) {
+        done_ = true;
+      }
+    });
+
+    return executor_->Transfer(result);

Review comment:
       Unless I'm misreading this, it seems only the trivial callback above is spawned on the executor?

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);
+      }
+
+      auto next_fut = generator_();
+      // If finished already, process results immediately inside the loop to avoid stack
+      // overflow
+      if (next_fut.is_finished()) {
+        auto next_result = next_fut.result();
+        if (next_result.ok()) {
+          last_value_ = *next_result;
+        } else {
+          return Future<V>::MakeFinished(next_result.status());
+        }
+        // Otherwise, if not finished immediately, add callback to process results
+      } else {
+        return next_fut.Then([this](const Result<T>& next_result) {
+          if (next_result.ok()) {
+            last_value_ = *next_result;
+            return (*this)();
+          } else {
+            return Future<V>::MakeFinished(next_result.status());
+          }
+        });
+      }
+    }
+  }
+
+ protected:
+  bool finished_;
+  util::optional<T> last_value_;
+  AsyncGenerator<T> generator_;
+  Transformer<T, V> transformer_;
+};
+
+template <typename T>
+static std::function<void(const Result<T>&)> MakeCallback(
+    std::shared_ptr<bool> finished) {
+  return [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+}
+
+template <typename T>
+AsyncGenerator<T> AddReadahead(AsyncGenerator<T> source_generator, int max_readahead) {
+  // Using a shared_ptr instead of a lambda capture here because it's possible that
+  // the inner mark_finished_if_done outlives the outer lambda
+  auto finished = std::make_shared<bool>(false);
+  auto mark_finished_if_done = [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+
+  std::queue<Future<T>> readahead_queue;
+  return [=]() mutable -> Future<T> {
+    if (readahead_queue.empty()) {
+      // This is the first request, let's pump the underlying queue
+      for (int i = 0; i < max_readahead; i++) {
+        auto next = source_generator();
+        next.AddCallback(mark_finished_if_done);
+        readahead_queue.push(std::move(next));
+      }
+    }
+    // Pop one and add one
+    auto result = readahead_queue.front();
+    readahead_queue.pop();
+    if (*finished) {
+      readahead_queue.push(Future<T>::MakeFinished(IterationTraits<T>::End()));
+    } else {
+      auto back_of_queue = source_generator();
+      back_of_queue.AddCallback(mark_finished_if_done);
+      readahead_queue.push(std::move(back_of_queue));
+    }
+    return result;
+  };
+}
+
+/// \brief Transforms an async generator using a transformer function returning a new
+/// AsyncGenerator
+///
+/// The transform function here behaves exactly the same as the transform function in
+/// MakeTransformedIterator and you can safely use the same transform function to
+/// transform both synchronous and asynchronous streams.
+template <typename T, typename V>
+AsyncGenerator<V> TransformAsyncGenerator(AsyncGenerator<T> generator,
+                                          Transformer<T, V> transformer) {
+  return TransformingGenerator<T, V>(generator, transformer);
+}
+
+namespace detail {
+
+template <typename T>
+struct BackgroundIteratorPromise : ReadaheadPromise {
+  ~BackgroundIteratorPromise() override {}
+
+  explicit BackgroundIteratorPromise(Iterator<T>* it) : it_(it) {}
+
+  bool Call() override {
+    auto next = it_->Next();
+    auto finished = next == IterationTraits<T>::End();
+    out_.MarkFinished(std::move(next));
+    return finished;
+  }
+
+  void End() override { out_.MarkFinished(IterationTraits<T>::End()); }
+
+  Iterator<T>* it_;
+  Future<T> out_ = Future<T>::Make();
+};
+
+}  // namespace detail
+
+/// \brief Async generator that iterates on an underlying iterator in a
+/// separate thread.

Review comment:
       "in an executor", rather?
   
   Can you document whether the underlying iterator may be called from several threads at once?

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);
+      }
+
+      auto next_fut = generator_();
+      // If finished already, process results immediately inside the loop to avoid stack
+      // overflow
+      if (next_fut.is_finished()) {
+        auto next_result = next_fut.result();
+        if (next_result.ok()) {
+          last_value_ = *next_result;
+        } else {
+          return Future<V>::MakeFinished(next_result.status());
+        }
+        // Otherwise, if not finished immediately, add callback to process results
+      } else {
+        return next_fut.Then([this](const Result<T>& next_result) {
+          if (next_result.ok()) {
+            last_value_ = *next_result;
+            return (*this)();
+          } else {
+            return Future<V>::MakeFinished(next_result.status());
+          }
+        });
+      }
+    }
+  }
+
+ protected:
+  bool finished_;
+  util::optional<T> last_value_;
+  AsyncGenerator<T> generator_;
+  Transformer<T, V> transformer_;
+};
+
+template <typename T>
+static std::function<void(const Result<T>&)> MakeCallback(
+    std::shared_ptr<bool> finished) {
+  return [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+}
+
+template <typename T>
+AsyncGenerator<T> AddReadahead(AsyncGenerator<T> source_generator, int max_readahead) {
+  // Using a shared_ptr instead of a lambda capture here because it's possible that
+  // the inner mark_finished_if_done outlives the outer lambda
+  auto finished = std::make_shared<bool>(false);
+  auto mark_finished_if_done = [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+
+  std::queue<Future<T>> readahead_queue;
+  return [=]() mutable -> Future<T> {

Review comment:
       Instead of having the closure state spread around as (copied) local variables, perhaps create a dedicated callable object?

##########
File path: cpp/src/arrow/util/async_iterator.h
##########
@@ -0,0 +1,295 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "arrow/util/functional.h"
+#include "arrow/util/future.h"
+#include "arrow/util/iterator.h"
+#include "arrow/util/optional.h"
+#include "arrow/util/thread_pool.h"
+
+namespace arrow {
+
+template <typename T>
+using AsyncGenerator = std::function<Future<T>()>;
+
+/// Iterates through a generator of futures, visiting the result of each one and
+/// returning a future that completes when all have been visited
+template <typename T>
+Future<> VisitAsyncGenerator(AsyncGenerator<T> generator,
+                             std::function<Status(T)> visitor) {
+  auto loop_body = [generator, visitor] {
+    auto next = generator();
+    return next.Then([visitor](const T& result) -> Result<ControlFlow<detail::Empty>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(detail::Empty());
+      } else {
+        auto visited = visitor(result);
+        if (visited.ok()) {
+          return Continue();
+        } else {
+          return visited;
+        }
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T>
+Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
+  auto vec = std::make_shared<std::vector<T>>();
+  auto loop_body = [generator, vec] {
+    auto next = generator();
+    return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
+      if (result == IterationTraits<T>::End()) {
+        return Break(*vec);
+      } else {
+        vec->push_back(result);
+        return Continue();
+      }
+    });
+  };
+  return Loop(loop_body);
+}
+
+template <typename T, typename V>
+class TransformingGenerator {
+ public:
+  explicit TransformingGenerator(AsyncGenerator<T> generator,
+                                 Transformer<T, V> transformer)
+      : finished_(), last_value_(), generator_(generator), transformer_(transformer) {}
+
+  // See comment on TransformingIterator::Pump
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Future<V> operator()() {
+    while (true) {
+      auto maybe_next_result = Pump();
+      if (!maybe_next_result.ok()) {
+        return Future<V>::MakeFinished(maybe_next_result.status());
+      }
+      auto maybe_next = maybe_next_result.ValueUnsafe();
+      if (maybe_next.has_value()) {
+        return Future<V>::MakeFinished(*maybe_next);
+      }
+
+      auto next_fut = generator_();
+      // If finished already, process results immediately inside the loop to avoid stack
+      // overflow
+      if (next_fut.is_finished()) {
+        auto next_result = next_fut.result();
+        if (next_result.ok()) {
+          last_value_ = *next_result;
+        } else {
+          return Future<V>::MakeFinished(next_result.status());
+        }
+        // Otherwise, if not finished immediately, add callback to process results
+      } else {
+        return next_fut.Then([this](const Result<T>& next_result) {
+          if (next_result.ok()) {
+            last_value_ = *next_result;
+            return (*this)();
+          } else {
+            return Future<V>::MakeFinished(next_result.status());
+          }
+        });
+      }
+    }
+  }
+
+ protected:
+  bool finished_;
+  util::optional<T> last_value_;
+  AsyncGenerator<T> generator_;
+  Transformer<T, V> transformer_;
+};
+
+template <typename T>
+static std::function<void(const Result<T>&)> MakeCallback(
+    std::shared_ptr<bool> finished) {
+  return [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+}
+
+template <typename T>
+AsyncGenerator<T> AddReadahead(AsyncGenerator<T> source_generator, int max_readahead) {
+  // Using a shared_ptr instead of a lambda capture here because it's possible that
+  // the inner mark_finished_if_done outlives the outer lambda
+  auto finished = std::make_shared<bool>(false);
+  auto mark_finished_if_done = [finished](const Result<T>& next_result) {
+    if (!next_result.ok()) {
+      *finished = true;
+    } else {
+      auto next = *next_result;
+      *finished = (next == IterationTraits<T>::End());
+    }
+  };
+
+  std::queue<Future<T>> readahead_queue;
+  return [=]() mutable -> Future<T> {
+    if (readahead_queue.empty()) {
+      // This is the first request, let's pump the underlying queue
+      for (int i = 0; i < max_readahead; i++) {
+        auto next = source_generator();
+        next.AddCallback(mark_finished_if_done);
+        readahead_queue.push(std::move(next));
+      }
+    }
+    // Pop one and add one
+    auto result = readahead_queue.front();
+    readahead_queue.pop();
+    if (*finished) {
+      readahead_queue.push(Future<T>::MakeFinished(IterationTraits<T>::End()));
+    } else {
+      auto back_of_queue = source_generator();
+      back_of_queue.AddCallback(mark_finished_if_done);

Review comment:
       Why use `AddCallback` while this can simply be done synchronously above? This will add overhead.

##########
File path: cpp/src/arrow/util/task_group_test.cc
##########
@@ -244,6 +245,65 @@ void TestNoCopyTask(std::shared_ptr<TaskGroup> task_group) {
   ASSERT_EQ(0, *counter);
 }
 
+void TestFinishNotSticky(std::function<std::shared_ptr<TaskGroup>()> factory) {
+  // If a task is added that runs very quickly it might decrement the task counter back
+  // down to 0 and mark the completion future as complete before all tasks are added.
+  // The "finished future" of the task group could get stuck to complete.
+  const int NTASKS = 100;
+  for (int i = 0; i < NTASKS; ++i) {
+    auto task_group = factory();
+    // Add a task and let it complete
+    task_group->Append([] { return Status::OK(); });
+    // Wait a little bit, if the task group was going to lock the finish hopefully it
+    // would do so here while we wait
+    SleepFor(1e-2);
+
+    // Add a new task that will still be running
+    std::atomic<bool> ready(false);
+    std::mutex m;
+    std::condition_variable cv;
+    task_group->Append([&m, &cv, &ready] {
+      std::unique_lock<std::mutex> lk(m);
+      cv.wait(lk, [&ready] { return ready.load(); });
+      return Status::OK();
+    });
+
+    // Ensure task group not finished already
+    auto finished = task_group->FinishAsync();
+    ASSERT_FALSE(finished.is_finished());
+
+    std::unique_lock<std::mutex> lk(m);
+    ready = true;
+    lk.unlock();
+    cv.notify_one();
+
+    ASSERT_TRUE(finished.Wait(1));

Review comment:
       Should we check that `status()` here as well (and below)?

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -837,20 +885,121 @@ class ThreadedTableReader : public BaseTableReader {
   }
 
  protected:
-  ThreadPool* thread_pool_;
+  Executor* thread_pool_;
+  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+};
+
+class AsyncThreadedTableReader
+    : public BaseTableReader,
+      public std::enable_shared_from_this<AsyncThreadedTableReader> {
+ public:
+  using BaseTableReader::BaseTableReader;
+
+  AsyncThreadedTableReader(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
+                           const ReadOptions& read_options,
+                           const ParseOptions& parse_options,
+                           const ConvertOptions& convert_options, Executor* thread_pool)
+      : BaseTableReader(pool, input, read_options, parse_options, convert_options),
+        thread_pool_(thread_pool) {}
+
+  ~AsyncThreadedTableReader() override {
+    if (task_group_) {
+      // In case of error, make sure all pending tasks are finished before
+      // we start destroying BaseTableReader members
+      ARROW_UNUSED(task_group_->Finish());
+    }
+  }
+
+  Status Init() override {
+    ARROW_ASSIGN_OR_RAISE(auto istream_it,
+                          io::MakeInputStreamIterator(input_, read_options_.block_size));
+
+    ARROW_ASSIGN_OR_RAISE(auto bg_it,
+                          MakeBackgroundIterator(std::move(istream_it), thread_pool_));
+
+    int32_t block_queue_size = thread_pool_->GetCapacity();
+    auto rh_it = AddReadahead(bg_it, block_queue_size);
+    buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(rh_it));
+    return Status::OK();
+  }
+
+  Result<std::shared_ptr<Table>> Read() override { return ReadAsync().result(); }
+
+  Future<std::shared_ptr<Table>> ReadAsync() override {
+    task_group_ = internal::TaskGroup::MakeThreaded(thread_pool_);
+
+    auto self = shared_from_this();
+    return ProcessFirstBuffer().Then([self](const std::shared_ptr<Buffer> first_buffer) {
+      auto block_generator = ThreadedBlockReader::MakeAsyncIterator(
+          self->buffer_generator_, MakeChunker(self->parse_options_),
+          std::move(first_buffer));
+
+      std::function<Status(util::optional<CSVBlock>)> block_visitor =
+          [self](util::optional<CSVBlock> maybe_block) -> Status {
+        DCHECK(!maybe_block->consume_bytes);

Review comment:
       So `maybe_block` always has a value...?

##########
File path: cpp/src/arrow/util/task_group_test.cc
##########
@@ -292,5 +352,25 @@ TEST(ThreadedTaskGroup, StressFailingTaskGroupLifetime) {
       [&] { return TaskGroup::MakeThreaded(thread_pool.get()); });
 }
 
+TEST(ThreadedTaskGroup, FinishNotSticky) {
+  std::shared_ptr<ThreadPool> thread_pool;
+  ASSERT_OK_AND_ASSIGN(thread_pool, ThreadPool::Make(16));
+
+  TestFinishNotSticky([&] { return TaskGroup::MakeThreaded(thread_pool.get()); });
+}
+
+TEST(ThreadedTaskGroup, FinishNeverStarted) {
+  std::shared_ptr<ThreadPool> thread_pool;
+  ASSERT_OK_AND_ASSIGN(thread_pool, ThreadPool::Make(4));
+  TestFinishNeverStarted(TaskGroup::MakeThreaded(thread_pool.get()));
+}
+
+TEST(ThreadedTaskGroup, FinishAlreadyCompleted) {
+  std::shared_ptr<ThreadPool> thread_pool;
+  ASSERT_OK_AND_ASSIGN(thread_pool, ThreadPool::Make(16));
+
+  TestFinishAlreadyCompleted([&] { return TaskGroup::MakeThreaded(thread_pool.get()); });
+}

Review comment:
       Shouldn't you similarly call those tests for `SerialTaskGroup`?

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+  auto original = VectorIt({1, 2, 3});
+  auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+  auto original = VectorIt<std::shared_ptr<int>>(
+      {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
+  auto truncated =
+      MakeTransformedIterator(std::move(original), MakeFirstN<std::shared_ptr<int>>(2));
+  ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+  ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+  // Tests the failsafe case where we never call Finish
+  auto original = VectorIt({1});
+  auto truncated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                             MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto background = BackgroundAsyncVectorIt(expected);
+  auto future = CollectAsyncGenerator(background);
+  ASSERT_FALSE(future.is_finished());
+  future.Wait();
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+  Result<TestInt> Next() {
+    if (called_) {
+      return Status::Invalid("Should not have been called twice");
+    }
+    SleepFor(0.1);
+    return IterationTraits<TestInt>::End();
+  }
+
+ private:
+  bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+  // Ensure that the background iterator properly fulfills the asyncgenerator contract
+  // and can be called after it ends.
+  auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+  ASSERT_OK_AND_ASSIGN(
+      auto background_iter,
+      MakeBackgroundIterator(std::move(iterator), internal::GetCpuThreadPool()));
+
+  auto one = background_iter();
+  auto two = background_iter();
+
+  ASSERT_TRUE(one.Wait(0.5));
+
+  if (one.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+  }
+
+  ASSERT_TRUE(two.Wait(0.5));
+  ASSERT_TRUE(two.is_finished());
+  if (two.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+  }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+  AsyncGenerator<TestInt> generator = []() {
+    return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+  };
+  Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, skip_all);
+  auto future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+  ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+  auto expected = RangeVector(100);
+  std::vector<Future<std::vector<TestInt>>> futures;
+  for (unsigned int i = 0; i < 100; i++) {
+    auto background = BackgroundAsyncVectorIt(expected);
+    futures.push_back(CollectAsyncGenerator(background));
+  }
+  auto combined = All(futures);
+  combined.Wait(2);
+  if (combined.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+    for (auto&& vector : completed_vectors) {
+      ASSERT_EQ(vector, expected);
+    }
+  } else {
+    FAIL() << "After 2 seconds all background iterators had not finished collecting";
+  }
+}
+
+TEST(TestAsyncUtil, StackOverflow) {
+  int counter = 0;
+  AsyncGenerator<TestInt> generator = [&counter]() {
+    if (counter < 1000000) {
+      return Future<TestInt>::MakeFinished(counter++);
+    } else {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+  };
+  Transformer<TestInt, TestInt> discard =
+      [](TestInt next) -> Result<TransformFlow<TestInt>> { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, discard);
+  auto collected_future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(collected_future.Wait(5));
+  if (collected_future.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto collected, collected_future.result());
+    ASSERT_EQ(0, collected.size());
+  }
+}
+
+TEST(TestAsyncUtil, Visit) {
+  auto generator = AsyncVectorIt({1, 2, 3});
+  unsigned int sum = 0;
+  auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt item) {
+    sum += item.value;
+    return Status::OK();
+  });
+  // Should be superfluous
+  sum_future.Wait();
+  ASSERT_EQ(6, sum);
+}
+
+TEST(TestAsyncUtil, Collect) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto generator = AsyncVectorIt(expected);
+  auto collected = CollectAsyncGenerator(generator);
+  ASSERT_EQ(expected, *collected.result());
+}
+
+template <typename T>
+Transformer<T, T> MakeRepeatN(int repeat_count) {
+  int current_repeat = 0;
+  return [repeat_count, current_repeat](T next) mutable -> Result<TransformFlow<T>> {
+    current_repeat++;
+    bool ready_for_next = false;
+    if (current_repeat == repeat_count) {
+      current_repeat = 0;
+      ready_for_next = true;
+    }
+    return TransformYield(next, ready_for_next);
+  };
+}
+
+TEST(TestIteratorTransform, Repeating) {
+  auto original = VectorIt({1, 2, 3});
+  auto repeated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                            MakeRepeatN<TestInt>(2));
+  AssertIteratorMatch({1, 1, 2, 2, 3, 3}, std::move(repeated));
+}
+
+template <typename T>
+Transformer<T, T> MakeFilter(std::function<bool(T&)> filter) {
+  return [filter](T next) -> Result<TransformFlow<T>> {
+    if (filter(next)) {
+      return TransformYield(next);
+    } else {
+      return TransformSkip();
+    }
+  };
+}
+
+template <typename T>
+Transformer<T, T> MakeAbortOnSecond() {
+  int counter = 0;
+  return [counter](T next) mutable -> Result<TransformFlow<T>> {
+    if (counter++ == 1) {
+      return Status::Invalid("X");
+    }
+    return TransformYield(next);
+  };
+}
+
+TEST(TestIteratorTransform, SkipSome) {
+  // Exercises TransformSkip
+  auto original = VectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return t.value != 2; });
+  auto filtered = MakeTransformedIterator(std::move(original), filter);
+  AssertIteratorMatch({1, 3}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, SkipAll) {
+  // Exercises TransformSkip
+  auto original = VectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return false; });
+  auto filtered = MakeTransformedIterator(std::move(original), filter);
+  AssertIteratorMatch({}, std::move(filtered));
+}
+
+TEST(TestIteratorTransform, Abort) {
+  auto original = VectorIt({1, 2, 3});
+  auto transformed =
+      MakeTransformedIterator(std::move(original), MakeAbortOnSecond<TestInt>());
+  ASSERT_OK(transformed.Next());
+  ASSERT_RAISES(Invalid, transformed.Next());
+}
+
+TEST(TestAsyncIteratorTransform, SkipSome) {
+  auto original = AsyncVectorIt({1, 2, 3});
+  auto filter = MakeFilter<TestInt>([](TestInt& t) { return t.value != 2; });
+  auto filtered = TransformAsyncGenerator(std::move(original), filter);
+  AssertAsyncGeneratorMatch({1, 3}, std::move(filtered));
+}
+
+TEST(TestAsyncUtil, ReadaheadFailed) {
+  auto source = []() -> Future<TestInt> {
+    return Future<TestInt>::MakeFinished(Status::Invalid("X"));
+  };
+  auto readahead = AddReadahead<TestInt>(source, 10);
+  auto next = readahead();
+  ASSERT_EQ(Status::Invalid("X"), next.status());
+}
+
+TEST(TestAsyncUtil, Readahead) {
+  int num_delivered = 0;
+  auto source = [&num_delivered]() {
+    if (num_delivered < 5) {
+      return Future<TestInt>::MakeFinished(num_delivered++);
+    } else {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+  };
+  auto readahead = AddReadahead<TestInt>(source, 10);
+  // Should not pump until first item requested
+  ASSERT_EQ(0, num_delivered);
+
+  auto first = readahead();
+  // At this point the pumping should have happened
+  ASSERT_EQ(5, num_delivered);
+  ASSERT_EQ(0, first.result()->value);

Review comment:
       `ASSERT_OK_AND_EQ`?

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -150,20 +154,21 @@ struct CSVBlock {
   std::function<Status(int64_t)> consume_bytes;
 };
 
+// This is an unfortunate side-effect of using optional<T> as the iterator in the
+// CSVBlock iterator.  We need to be able to compare with
+// IterationTraits<optional<T>>::End() and empty optionals will always compare true but
+// the optional copmarator won't compile if the underlying type isn't comparable

Review comment:
       "comparator"

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -220,14 +238,36 @@ class ThreadedBlockReader : public BlockReader {
  public:
   using BlockReader::BlockReader;
 
-  Result<arrow::util::optional<CSVBlock>> Next() {
+  static Iterator<util::optional<CSVBlock>> MakeIterator(
+      Iterator<std::shared_ptr<Buffer>> buffer_iterator, std::unique_ptr<Chunker> chunker,
+      std::shared_ptr<Buffer> first_buffer) {
+    auto block_reader =
+        std::make_shared<ThreadedBlockReader>(std::move(chunker), first_buffer);
+    // Wrap shared pointer in callable
+    Transformer<std::shared_ptr<Buffer>, util::optional<CSVBlock>> block_reader_fn =
+        [block_reader](std::shared_ptr<Buffer> next) { return (*block_reader)(next); };
+    return MakeTransformedIterator(std::move(buffer_iterator), block_reader_fn);
+  }
+
+  static AsyncGenerator<util::optional<CSVBlock>> MakeAsyncIterator(
+      AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
+      std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer) {
+    auto block_reader =
+        std::make_shared<ThreadedBlockReader>(std::move(chunker), first_buffer);
+    // Wrap shared pointer in callable
+    Transformer<std::shared_ptr<Buffer>, util::optional<CSVBlock>> block_reader_fn =
+        [block_reader](std::shared_ptr<Buffer> next) { return (*block_reader)(next); };
+    return TransformAsyncGenerator(buffer_generator, block_reader_fn);

Review comment:
       `std::move(buffer_generator)`?

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {
+  auto original = VectorIt({1, 2, 3});
+  auto truncated = MakeTransformedIterator(std::move(original), MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1, 2}, std::move(truncated));
+}
+
+TEST(TestIteratorTransform, TestPointer) {
+  auto original = VectorIt<std::shared_ptr<int>>(
+      {std::make_shared<int>(1), std::make_shared<int>(2), std::make_shared<int>(3)});
+  auto truncated =
+      MakeTransformedIterator(std::move(original), MakeFirstN<std::shared_ptr<int>>(2));
+  ASSERT_OK_AND_ASSIGN(auto result, truncated.ToVector());
+  ASSERT_EQ(2, result.size());
+}
+
+TEST(TestIteratorTransform, TruncatingShort) {
+  // Tests the failsafe case where we never call Finish
+  auto original = VectorIt({1});
+  auto truncated = MakeTransformedIterator<TestInt, TestInt>(std::move(original),
+                                                             MakeFirstN<TestInt>(2));
+  AssertIteratorMatch({1}, std::move(truncated));
+}
+
+TEST(TestAsyncUtil, Background) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto background = BackgroundAsyncVectorIt(expected);
+  auto future = CollectAsyncGenerator(background);
+  ASSERT_FALSE(future.is_finished());
+  future.Wait();
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_EQ(expected, *future.result());
+}
+
+struct SlowEmptyIterator {
+  Result<TestInt> Next() {
+    if (called_) {
+      return Status::Invalid("Should not have been called twice");
+    }
+    SleepFor(0.1);
+    return IterationTraits<TestInt>::End();
+  }
+
+ private:
+  bool called_ = false;
+};
+
+TEST(TestAsyncUtil, BackgroundRepeatEnd) {
+  // Ensure that the background iterator properly fulfills the asyncgenerator contract
+  // and can be called after it ends.
+  auto iterator = Iterator<TestInt>(SlowEmptyIterator());
+  ASSERT_OK_AND_ASSIGN(
+      auto background_iter,
+      MakeBackgroundIterator(std::move(iterator), internal::GetCpuThreadPool()));
+
+  auto one = background_iter();
+  auto two = background_iter();
+
+  ASSERT_TRUE(one.Wait(0.5));
+
+  if (one.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *one.result());
+  }
+
+  ASSERT_TRUE(two.Wait(0.5));
+  ASSERT_TRUE(two.is_finished());
+  if (two.is_finished()) {
+    ASSERT_EQ(IterationTraits<TestInt>::End(), *two.result());
+  }
+}
+
+TEST(TestAsyncUtil, SynchronousFinish) {
+  AsyncGenerator<TestInt> generator = []() {
+    return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+  };
+  Transformer<TestInt, TestInt> skip_all = [](TestInt value) { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, skip_all);
+  auto future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(future.is_finished());
+  ASSERT_OK_AND_ASSIGN(auto actual, future.result());
+  ASSERT_EQ(std::vector<TestInt>(), actual);
+}
+
+TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
+  auto expected = RangeVector(100);
+  std::vector<Future<std::vector<TestInt>>> futures;
+  for (unsigned int i = 0; i < 100; i++) {
+    auto background = BackgroundAsyncVectorIt(expected);
+    futures.push_back(CollectAsyncGenerator(background));
+  }
+  auto combined = All(futures);
+  combined.Wait(2);
+  if (combined.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto completed_vectors, combined.result());
+    for (auto&& vector : completed_vectors) {
+      ASSERT_EQ(vector, expected);
+    }
+  } else {
+    FAIL() << "After 2 seconds all background iterators had not finished collecting";
+  }
+}
+
+TEST(TestAsyncUtil, StackOverflow) {
+  int counter = 0;
+  AsyncGenerator<TestInt> generator = [&counter]() {
+    if (counter < 1000000) {
+      return Future<TestInt>::MakeFinished(counter++);
+    } else {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+  };
+  Transformer<TestInt, TestInt> discard =
+      [](TestInt next) -> Result<TransformFlow<TestInt>> { return TransformSkip(); };
+  auto transformed = TransformAsyncGenerator(generator, discard);
+  auto collected_future = CollectAsyncGenerator(transformed);
+  ASSERT_TRUE(collected_future.Wait(5));
+  if (collected_future.is_finished()) {
+    ASSERT_OK_AND_ASSIGN(auto collected, collected_future.result());
+    ASSERT_EQ(0, collected.size());
+  }
+}
+
+TEST(TestAsyncUtil, Visit) {
+  auto generator = AsyncVectorIt({1, 2, 3});
+  unsigned int sum = 0;
+  auto sum_future = VisitAsyncGenerator<TestInt>(generator, [&sum](TestInt item) {
+    sum += item.value;
+    return Status::OK();
+  });
+  // Should be superfluous
+  sum_future.Wait();
+  ASSERT_EQ(6, sum);
+}
+
+TEST(TestAsyncUtil, Collect) {
+  std::vector<TestInt> expected = {1, 2, 3};
+  auto generator = AsyncVectorIt(expected);
+  auto collected = CollectAsyncGenerator(generator);
+  ASSERT_EQ(expected, *collected.result());

Review comment:
       `ASSERT_OK_AND_EQ`?

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -177,14 +182,26 @@ class SerialBlockReader : public BlockReader {
  public:
   using BlockReader::BlockReader;
 
-  Result<arrow::util::optional<CSVBlock>> Next() {
+  static Iterator<util::optional<CSVBlock>> MakeIterator(
+      Iterator<std::shared_ptr<Buffer>> buffer_iterator, std::unique_ptr<Chunker> chunker,
+      std::shared_ptr<Buffer> first_buffer) {
+    auto block_reader =
+        std::make_shared<SerialBlockReader>(std::move(chunker), first_buffer);
+    // Wrap shared pointer in callable
+    Transformer<std::shared_ptr<Buffer>, util::optional<CSVBlock>> block_reader_fn =
+        [block_reader](std::shared_ptr<Buffer> buf) {
+          return (*block_reader)(std::move(buf));
+        };
+    return MakeTransformedIterator(std::move(buffer_iterator), block_reader_fn);
+  }
+
+  Result<TransformFlow<util::optional<CSVBlock>>> operator()(

Review comment:
       Hmm... why are we still returning `util::optional` if we never yield a value-less result?

##########
File path: cpp/src/arrow/util/iterator.cc
##########
@@ -119,14 +123,30 @@ class ReadaheadQueue::Impl : public std::enable_shared_from_this<ReadaheadQueue:
   void DoWork() {
     std::unique_lock<std::mutex> lock(mutex_);
     while (!please_shutdown_) {
-      while (static_cast<int64_t>(done_.size()) < max_readahead_ && todo_.size() > 0) {
+      while (todo_.size() > 0 &&
+             ((max_readahead_ <= 0) ||
+              (static_cast<int64_t>(done_.size()) < max_readahead_))) {
         auto promise = std::move(todo_.front());
         todo_.pop_front();
         lock.unlock();
-        promise->Call();
+        if (promise->Call()) {
+          // If the call finished then we should purge the remaining TODO items, marking
+          // them finished
+          lock.lock();
+          std::deque<std::unique_ptr<ReadaheadPromise>> to_clear(std::move(todo_));
+          // While the async iterator doesn't use todo_ anymore after it hits a finish the
+          // sync iterator might still due to timing so leave it valid
+          todo_.clear();
+          lock.unlock();
+          for (auto&& promise : to_clear) {
+            promise->End();
+          }
+        }
         lock.lock();
-        done_.push_back(std::move(promise));
-        work_done_.notify_one();
+        if (max_readahead_ > 0) {
+          done_.push_back(std::move(promise));
+          work_done_.notify_one();
+        }

Review comment:
       This is frankly weird. It seems the logic here is conflating "unbounded readahead" and "is an async generator".
   
   More generally, the fact that two different control flows (sync vs async) seem to be handled in the same implementation make things very confusing and difficult to follow. Can you find a way to separate the two cases?
   

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -214,6 +255,270 @@ TEST(TestVectorIterator, RangeForLoop) {
   ASSERT_EQ(ints_it, ints.end());
 }
 
+template <typename T>
+Transformer<T, T> MakeFirstN(int n) {
+  int remaining = n;
+  return [remaining](T next) mutable -> Result<TransformFlow<T>> {
+    if (remaining > 0) {
+      remaining--;
+      return TransformYield(next);
+    }
+    return TransformFinish();
+  };
+}
+
+TEST(TestIteratorTransform, Truncating) {

Review comment:
       (same for async generator transform tests)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org