You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/28 13:35:14 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #12712: ARROW-16026: [C++] Add support for the serial executor to expose an async generator as an iterable

pitrou commented on a change in pull request #12712:
URL: https://github.com/apache/arrow/pull/12712#discussion_r836423004



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -276,25 +277,100 @@ class ARROW_EXPORT SerialExecutor : public Executor {
     return FutureToSync(fut);
   }
 
+  template <typename T>
+  static Iterator<T> RunGeneratorInSerialExecutor(

Review comment:
       Nit, but `SerialExecutor::RunGeneratorInSerialExecutor` seems bit verbose. Perhaps something like `SerialExecutor::IterateGenerator`?

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -276,25 +277,100 @@ class ARROW_EXPORT SerialExecutor : public Executor {
     return FutureToSync(fut);
   }
 
+  template <typename T>

Review comment:
       Add a small docstring here?

##########
File path: cpp/src/arrow/testing/async_test_util.h
##########
@@ -20,12 +20,37 @@
 #include <atomic>
 #include <memory>
 
+#include "arrow/testing/gtest_util.h"
 #include "arrow/util/async_generator.h"
 #include "arrow/util/future.h"
 
 namespace arrow {
 namespace util {
 
+template <typename T>
+AsyncGenerator<T> AsyncVectorIt(std::vector<T> v) {
+  return MakeVectorGenerator(std::move(v));
+}
+
+template <typename T>
+AsyncGenerator<T> FailsAt(AsyncGenerator<T> src, int failing_index) {

Review comment:
       Nit: `FailsAt` sounds like a predicate, perhaps `FailAt`?

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -261,6 +262,140 @@ TEST_P(TestRunSynchronously, PropagatedError) {
 INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously,
                          ::testing::Values(false, true));
 
+TEST(SerialExecutor, AsyncGenerator) {
+  std::vector<TestInt> values{1, 2, 3, 4, 5};
+  auto source = util::SlowdownABit(util::AsyncVectorIt(values));
+  Iterator<TestInt> iter = SerialExecutor::RunGeneratorInSerialExecutor<TestInt>(
+      [&source](Executor* executor) {
+        return MakeMappedGenerator(source, [executor](const TestInt& ti) {
+          return DeferNotOk(executor->Submit([ti] { return ti; }));
+        });
+      });
+  ASSERT_OK_AND_ASSIGN(auto vec, iter.ToVector());
+  ASSERT_EQ(vec, values);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithFollowUp) {
+  // Sometimes a task will generate follow-up tasks.  These should be run
+  // before the next task is started
+  bool follow_up_ran = false;
+  bool first = true;
+  Iterator<TestInt> iter =
+      SerialExecutor::RunGeneratorInSerialExecutor<TestInt>([&](Executor* executor) {
+        return [=, &first, &follow_up_ran]() -> Future<TestInt> {
+          if (first) {
+            first = false;
+            Future<TestInt> item =
+                DeferNotOk(executor->Submit([] { return TestInt(0); }));
+            RETURN_NOT_OK(executor->Spawn([&] { follow_up_ran = true; }));
+            return item;
+          }
+          return DeferNotOk(executor->Submit([] { return IterationEnd<TestInt>(); }));
+        };
+      });
+  ASSERT_FALSE(follow_up_ran);
+  ASSERT_OK_AND_EQ(TestInt(0), iter.Next());
+  ASSERT_FALSE(follow_up_ran);
+  ASSERT_OK_AND_EQ(IterationEnd<TestInt>(), iter.Next());
+  ASSERT_TRUE(follow_up_ran);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithAsyncFollowUp) {
+  // Simulates a situation where a user calls into the async generator, tasks (e.g. I/O
+  // readahead tasks) are spawned onto the I/O threadpool, the user gets a result, and
+  // then the I/O readahead tasks are completed while there is no calling thread in the
+  // async generator to hand the task off to (it should be queued up)
+  bool follow_up_ran = false;
+  bool first = true;
+  Executor* captured_executor;
+  Iterator<TestInt> iter =
+      SerialExecutor::RunGeneratorInSerialExecutor<TestInt>([&](Executor* executor) {
+        return [=, &first, &captured_executor]() -> Future<TestInt> {
+          if (first) {
+            captured_executor = executor;
+            first = false;
+            return DeferNotOk(executor->Submit([] {
+              // I/O tasks would be scheduled at this point
+              return TestInt(0);
+            }));
+          }
+          return DeferNotOk(executor->Submit([] { return IterationEnd<TestInt>(); }));
+        };
+      });
+  ASSERT_FALSE(follow_up_ran);
+  ASSERT_OK_AND_EQ(TestInt(0), iter.Next());
+  // I/O task completes and has reference to executor to submit continuation
+  ASSERT_OK(captured_executor->Spawn([&] { follow_up_ran = true; }));
+  // Follow-up task can't run right now because there is no thread in the executor
+  SleepABit();
+  ASSERT_FALSE(follow_up_ran);
+  // Follow-up should run as part of retrieving the next item
+  ASSERT_OK_AND_EQ(IterationEnd<TestInt>(), iter.Next());
+  ASSERT_TRUE(follow_up_ran);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithCleanup) {
+  // Sometimes a final task might generate follow-up tasks.  Unlike other follow-up

Review comment:
       The "unlike" looks a bit confusing to me, this seems to imply that other follow-up tasks may not be run before the iterator is finished.

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -261,6 +262,140 @@ TEST_P(TestRunSynchronously, PropagatedError) {
 INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously,
                          ::testing::Values(false, true));
 
+TEST(SerialExecutor, AsyncGenerator) {
+  std::vector<TestInt> values{1, 2, 3, 4, 5};
+  auto source = util::SlowdownABit(util::AsyncVectorIt(values));
+  Iterator<TestInt> iter = SerialExecutor::RunGeneratorInSerialExecutor<TestInt>(
+      [&source](Executor* executor) {
+        return MakeMappedGenerator(source, [executor](const TestInt& ti) {
+          return DeferNotOk(executor->Submit([ti] { return ti; }));
+        });
+      });
+  ASSERT_OK_AND_ASSIGN(auto vec, iter.ToVector());
+  ASSERT_EQ(vec, values);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithFollowUp) {
+  // Sometimes a task will generate follow-up tasks.  These should be run
+  // before the next task is started
+  bool follow_up_ran = false;
+  bool first = true;
+  Iterator<TestInt> iter =
+      SerialExecutor::RunGeneratorInSerialExecutor<TestInt>([&](Executor* executor) {
+        return [=, &first, &follow_up_ran]() -> Future<TestInt> {
+          if (first) {
+            first = false;
+            Future<TestInt> item =
+                DeferNotOk(executor->Submit([] { return TestInt(0); }));
+            RETURN_NOT_OK(executor->Spawn([&] { follow_up_ran = true; }));
+            return item;
+          }
+          return DeferNotOk(executor->Submit([] { return IterationEnd<TestInt>(); }));
+        };
+      });
+  ASSERT_FALSE(follow_up_ran);
+  ASSERT_OK_AND_EQ(TestInt(0), iter.Next());
+  ASSERT_FALSE(follow_up_ran);
+  ASSERT_OK_AND_EQ(IterationEnd<TestInt>(), iter.Next());
+  ASSERT_TRUE(follow_up_ran);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithAsyncFollowUp) {
+  // Simulates a situation where a user calls into the async generator, tasks (e.g. I/O
+  // readahead tasks) are spawned onto the I/O threadpool, the user gets a result, and
+  // then the I/O readahead tasks are completed while there is no calling thread in the
+  // async generator to hand the task off to (it should be queued up)
+  bool follow_up_ran = false;
+  bool first = true;
+  Executor* captured_executor;

Review comment:
       ```suggestion
     Executor* captured_executor = nullptr;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org