You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "westonpace (via GitHub)" <gi...@apache.org> on 2023/06/13 16:01:32 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #35672: GH-35176: [C++] Add support for disabling threading for emscripten

westonpace commented on code in PR #35672:
URL: https://github.com/apache/arrow/pull/35672#discussion_r1228352814


##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -1036,7 +1052,60 @@ class GatingTask::Impl : public std::enable_shared_from_this<GatingTask::Impl> {
     return unlocked_future_;
   }
 
+  void _WaitForEndOrUnlocked(std::chrono::time_point<std::chrono::steady_clock> end_time,

Review Comment:
   Why the leading underscore?



##########
cpp/src/arrow/util/thread_pool.h:
##########
@@ -194,6 +196,11 @@ class ARROW_EXPORT Executor {
   // Executor. Returns false if this Executor does not support this property.
   virtual bool OwnsThisThread() { return false; }
 
+  // Return true if this is the current executor being called
+  // n.b. this defaults to just calling OwnsThisThread
+  // unless the threadpool is disabled
+  virtual bool IsCurrentExecutor() { return OwnsThisThread(); }

Review Comment:
   I'm having a hard time understanding the difference between these two.  When would I prefer `IsCurrentExecutor` over `OwnsThisThread` (or vice versa)?



##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -60,11 +61,33 @@ struct SerialExecutor::State {
   std::thread::id current_thread;
   bool paused{false};
   bool finished{false};
+#ifndef ARROW_ENABLE_THREADING
+  int max_tasks_running{1};
+  int tasks_running{0};
+#endif
 };
 
-SerialExecutor::SerialExecutor() : state_(std::make_shared<State>()) {}
+#ifndef ARROW_ENABLE_THREADING
+// list of all SerialExecutor objects - as we need to run tasks from all pools at once in
+// Run()
+std::unordered_set<SerialExecutor*> SerialExecutor::all_executors;
+SerialExecutor* SerialExecutor::current_executor = NULL;
+SerialExecutor* SerialExecutor::GetCurrentExecutor() { return current_executor; }
+SerialExecutor* SerialExecutor::last_called_executor = NULL;

Review Comment:
   We should prefer static local variables.  E.g. something like...
   
   ```
   struct SerialExecutors {
     std::unordered_set<SerialExecutor*> SerialExecutor::all_executors;
     SerialExecutor* SerialExecutor::current_executor = NULL;
     SerialExecutor* SerialExecutor::last_called_executor = NULL;
   }
   
   SerialExecutors& GetSerialExecutors() {
     static SerialExecutors serial_executors;
     return serial_executors;
   }
   ```
   
   Also, we could use a bit more explanation here.  What is the difference between `current_executor` and `last_called_executors`?



##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -60,11 +61,33 @@ struct SerialExecutor::State {
   std::thread::id current_thread;
   bool paused{false};
   bool finished{false};
+#ifndef ARROW_ENABLE_THREADING
+  int max_tasks_running{1};
+  int tasks_running{0};
+#endif
 };
 
-SerialExecutor::SerialExecutor() : state_(std::make_shared<State>()) {}
+#ifndef ARROW_ENABLE_THREADING
+// list of all SerialExecutor objects - as we need to run tasks from all pools at once in
+// Run()
+std::unordered_set<SerialExecutor*> SerialExecutor::all_executors;
+SerialExecutor* SerialExecutor::current_executor = NULL;
+SerialExecutor* SerialExecutor::GetCurrentExecutor() { return current_executor; }
+SerialExecutor* SerialExecutor::last_called_executor = NULL;
+
+#endif
+
+SerialExecutor::SerialExecutor() : state_(std::make_shared<State>()) {
+#ifndef ARROW_ENABLE_THREADING
+  all_executors.insert(this);

Review Comment:
   This doesn't seem thread safe.  If `ARROW_ENABLE_THREADING` is false does this mean that we are assuming the user doesn't have any threads of their own?



##########
cpp/src/arrow/util/thread_pool.cc:
##########
@@ -77,6 +100,12 @@ SerialExecutor::~SerialExecutor() {
   }
 }
 
+int SerialExecutor::GetNumTasks() {
+  auto state = state_;
+  return (int)(state_->task_queue.size());

Review Comment:
   Prefer C++ style casts
   ```suggestion
     return static_cast<int>(state_->task_queue.size());
   ```
   



##########
cpp/src/arrow/util/future.cc:
##########
@@ -149,17 +150,39 @@ class ConcreteFutureImpl : public FutureImpl {
   }
 
   void DoWait() {
+#ifdef ARROW_ENABLE_THREADING
     std::unique_lock<std::mutex> lock(mutex_);
 
     cv_.wait(lock, [this] { return IsFutureFinished(state_); });
+#else
+    while (true) {
+      if (IsFutureFinished(state_)) {
+        return;
+      }
+      arrow::internal::SerialExecutor::RunTasksOnAllExecutors(true);
+    }
+#endif
   }
 
   bool DoWait(double seconds) {
+#ifdef ARROW_ENABLE_THREADING
     std::unique_lock<std::mutex> lock(mutex_);
 
     cv_.wait_for(lock, std::chrono::duration<double>(seconds),
                  [this] { return IsFutureFinished(state_); });
     return IsFutureFinished(state_);
+#else
+    auto start = std::chrono::steady_clock::now();
+    std::chrono::duration<double> fsec = std::chrono::duration<double>(seconds);
+    while (std::chrono::steady_clock::now() - start < fsec) {
+      // run one task then check time
+      if (IsFutureFinished(state_)) {
+        return true;
+      }
+      arrow::internal::SerialExecutor::RunTasksOnAllExecutors(true);
+    }
+    return IsFutureFinished(state_);

Review Comment:
   This pattern seems to have repeated itself a few times.  I wonder if we can add a utility method somewhere like `SafeWait(Func condition, double max_wait_seconds)`



##########
cpp/src/arrow/util/thread_pool.h:
##########
@@ -393,8 +420,23 @@ class ARROW_EXPORT SerialExecutor : public Executor {
     RunLoop();
     return final_fut;
   }
+
+#ifndef ARROW_ENABLE_THREADING
+  // we have to run tasks from all live executors
+  // during RunLoop if we don't have threading
+  static std::unordered_set<SerialExecutor*> all_executors;

Review Comment:
   If threading is disabled then how is there more than one serial executor?



##########
cpp/src/arrow/util/async_generator_test.cc:
##########
@@ -1486,13 +1490,24 @@ TEST(TestAsyncUtil, ReadaheadFailed) {
   // should all pass
   auto source = [&]() -> Future<TestInt> {
     auto count = counter++;
+#ifdef ARROW_ENABLE_THREADING
     return DeferNotOk(thread_pool->Submit([&, count]() -> Result<TestInt> {
       gating_task->Task()();
       if (count == 0) {
         return Status::Invalid("X");
       }
       return TestInt(count);
     }));
+#else
+    // if threading is disabled, we can't call Task() as we do below because it will
+    // never return and will block everything

Review Comment:
   Or possibly we could use a serial executor instead of the thread pool if threading is disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org