You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/15 17:37:33 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #10055: ARROW-12367: [C++] Stop producing when PushGenerator was destroyed

pitrou commented on a change in pull request #10055:
URL: https://github.com/apache/arrow/pull/10055#discussion_r614267822



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -766,52 +766,77 @@ class PushGenerator {
   /// Producer API for PushGenerator
   class Producer {
    public:
-    explicit Producer(std::shared_ptr<State> state) : state_(std::move(state)) {}
+    explicit Producer(const std::shared_ptr<State> state) : weak_state_(state) {}
 
-    /// Push a value on the queue
-    void Push(Result<T> result) {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    /// \brief Push a value on the queue
+    ///
+    /// True is returned if the value was pushed, false if the generator is
+    /// already closed or destroyed.  If the latter, it is recommended to stop
+    /// producing any further values.
+    bool Push(Result<T> result) {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Closed early
-        return;
+        return false;
       }
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(std::move(result));
-        return;
+      } else {
+        state->result_q.push_back(std::move(result));
       }
-      state_->result_q.push_back(std::move(result));
+      return true;
     }
 
     /// \brief Tell the consumer we have finished producing
     ///
     /// It is allowed to call this and later call Push() again ("early close").
     /// In this case, calls to Push() after the queue is closed are silently
     /// ignored.  This can help implementing non-trivial cancellation cases.
-    void Close() {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    ///
+    /// True is returned on success, false if the generator is already closed
+    /// or destroyed.
+    bool Close() {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Already closed
-        return;
+        return false;
       }
-      state_->finished = true;
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      state->finished = true;
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(IterationTraits<T>::End());
       }
+      return true;
     }
 
+    /// Return whether the generator was closed or destroyed.
     bool is_closed() const {
-      auto lock = state_->mutex.Lock();
-      return state_->finished;
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return true;
+      }
+      auto lock = state->mutex.Lock();

Review comment:
       Looks like reading an atomic bool is free on x86, not on ARM64 though: https://godbolt.org/z/fjh91hE1c




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org