You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/06 16:52:29 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #14334: ARROW-17927: [C++] Avoid hitting thread limits in stress tests

lidavidm commented on code in PR #14334:
URL: https://github.com/apache/arrow/pull/14334#discussion_r989280595


##########
cpp/src/arrow/testing/gtest_util.cc:
##########
@@ -767,23 +768,122 @@ void BusyWait(double seconds, std::function<bool()> predicate) {
   }
 }
 
-Future<> SleepAsync(double seconds) {
-  auto out = Future<>::Make();
-  std::thread([out, seconds]() mutable {
-    SleepFor(seconds);
-    out.MarkFinished();
-  }).detach();
-  return out;
-}
+namespace {
 
-Future<> SleepABitAsync() {
-  auto out = Future<>::Make();
-  std::thread([out]() mutable {
-    SleepABit();
-    out.MarkFinished();
-  }).detach();
-  return out;
-}
+// A helper class to sleep asynchronously.
+// Uses a single worker thread + priority queue to avoid resource consumption
+// issues when many async sleeps are requested (ARROW-17927).
+struct AsyncSleeper {
+  AsyncSleeper() { state_->worker = std::thread(RunLoop, state_); }
+
+  ~AsyncSleeper() {
+    if (!IsForkedChild()) {
+      // Join thread at shutdown
+      Join();
+    } else {
+      // Mutex and thread destructors can crash in child, just let them leak
+      new (state_.get()) State;
+    }
+  }
+
+  Future<> SleepFor(double seconds) {
+    DCHECK(!IsForkedChild()) << "Async sleep forbidden in forked child";
+
+    auto out = Future<>::Make();
+    const auto deadline = Clock::now() + std::chrono::duration_cast<Clock::duration>(
+                                             std::chrono::duration<double>(seconds));
+    std::unique_lock<std::mutex> lock(state_->mutex);
+    state_->Push(Event{deadline, out});
+    state_->cv.notify_all();
+    return out;
+  }
+
+ private:
+  void Join() {
+    std::unique_lock<std::mutex> lock(state_->mutex);
+    if (state_->worker.joinable()) {
+      state_->please_finish = true;
+      state_->cv.notify_all();
+      lock.unlock();
+      state_->worker.join();
+    }
+  }
+
+  struct State;
+
+  static void RunLoop(std::shared_ptr<State> state) {
+    std::unique_lock<std::mutex> lock(state->mutex);
+    while (!state->please_finish) {
+      if (state->events.empty()) {
+        // Wait for wakeup from Sleep or Join
+        state->cv.wait(lock);
+      } else {
+        // Wait for wakeup from Sleep or Join, or first deadline
+        // (beware that wait_until takes a const-ref to the deadline,
+        //  need to make a local copy to avoid concurrent mutations)
+        const auto deadline = state->events[0].deadline;
+        state->cv.wait_until(lock, deadline);
+      }
+      const auto now = Clock::now();
+      while (!state->please_finish && !state->events.empty() &&
+             now >= state->events[0].deadline) {
+        auto event = state->Pop();
+        lock.unlock();
+        event.future.MarkFinished();
+        lock.lock();
+      }
+    }

Review Comment:
   Do we want to empty the queue on finish? (Guess it should already be empty)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org