You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/15 15:23:41 UTC

[GitHub] [arrow] pitrou opened a new pull request #10055: ARROW-12367: [C++] Stop producing when PushGenerator was destroyed

pitrou opened a new pull request #10055:
URL: https://github.com/apache/arrow/pull/10055


   When a PushGenerator is lost but its Producer is left dangling, avoid pushing values and inform the caller about it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10055: ARROW-12367: [C++] Stop producing when PushGenerator was destroyed

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10055:
URL: https://github.com/apache/arrow/pull/10055#issuecomment-820516873


   @westonpace Do you want to review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10055: ARROW-12367: [C++] Stop producing when PushGenerator was destroyed

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10055:
URL: https://github.com/apache/arrow/pull/10055#discussion_r614257805



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -766,52 +766,77 @@ class PushGenerator {
   /// Producer API for PushGenerator
   class Producer {
    public:
-    explicit Producer(std::shared_ptr<State> state) : state_(std::move(state)) {}
+    explicit Producer(const std::shared_ptr<State> state) : weak_state_(state) {}
 
-    /// Push a value on the queue
-    void Push(Result<T> result) {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    /// \brief Push a value on the queue
+    ///
+    /// True is returned if the value was pushed, false if the generator is
+    /// already closed or destroyed.  If the latter, it is recommended to stop
+    /// producing any further values.
+    bool Push(Result<T> result) {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Closed early
-        return;
+        return false;
       }
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(std::move(result));
-        return;
+      } else {
+        state->result_q.push_back(std::move(result));
       }
-      state_->result_q.push_back(std::move(result));
+      return true;
     }
 
     /// \brief Tell the consumer we have finished producing
     ///
     /// It is allowed to call this and later call Push() again ("early close").
     /// In this case, calls to Push() after the queue is closed are silently
     /// ignored.  This can help implementing non-trivial cancellation cases.
-    void Close() {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    ///
+    /// True is returned on success, false if the generator is already closed
+    /// or destroyed.
+    bool Close() {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Already closed
-        return;
+        return false;
       }
-      state_->finished = true;
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      state->finished = true;
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(IterationTraits<T>::End());
       }
+      return true;
     }
 
+    /// Return whether the generator was closed or destroyed.
     bool is_closed() const {
-      auto lock = state_->mutex.Lock();
-      return state_->finished;
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return true;
+      }
+      auto lock = state->mutex.Lock();

Review comment:
       Since `is_closed` is already "best effort" you could probably make `finished` an atomic and avoid the lock here but that's a pretty minor concern and could potentially make other paths more expensive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10055: ARROW-12367: [C++] Stop producing when PushGenerator was destroyed

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10055:
URL: https://github.com/apache/arrow/pull/10055#issuecomment-820621784


   Looks like this will fit in 4.0 if CI goes fine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10055: ARROW-12367: [C++] Stop producing when PushGenerator was destroyed

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10055:
URL: https://github.com/apache/arrow/pull/10055#issuecomment-820594492


   https://issues.apache.org/jira/browse/ARROW-12367


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10055: ARROW-12367: [C++] Stop producing when PushGenerator was destroyed

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10055:
URL: https://github.com/apache/arrow/pull/10055#discussion_r614268823



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -766,52 +766,77 @@ class PushGenerator {
   /// Producer API for PushGenerator
   class Producer {
    public:
-    explicit Producer(std::shared_ptr<State> state) : state_(std::move(state)) {}
+    explicit Producer(const std::shared_ptr<State> state) : weak_state_(state) {}
 
-    /// Push a value on the queue
-    void Push(Result<T> result) {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    /// \brief Push a value on the queue
+    ///
+    /// True is returned if the value was pushed, false if the generator is
+    /// already closed or destroyed.  If the latter, it is recommended to stop
+    /// producing any further values.
+    bool Push(Result<T> result) {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Closed early
-        return;
+        return false;
       }
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(std::move(result));
-        return;
+      } else {
+        state->result_q.push_back(std::move(result));
       }
-      state_->result_q.push_back(std::move(result));
+      return true;
     }
 
     /// \brief Tell the consumer we have finished producing
     ///
     /// It is allowed to call this and later call Push() again ("early close").
     /// In this case, calls to Push() after the queue is closed are silently
     /// ignored.  This can help implementing non-trivial cancellation cases.
-    void Close() {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    ///
+    /// True is returned on success, false if the generator is already closed
+    /// or destroyed.
+    bool Close() {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Already closed
-        return;
+        return false;
       }
-      state_->finished = true;
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      state->finished = true;
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(IterationTraits<T>::End());
       }
+      return true;
     }
 
+    /// Return whether the generator was closed or destroyed.
     bool is_closed() const {
-      auto lock = state_->mutex.Lock();
-      return state_->finished;
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return true;
+      }
+      auto lock = state->mutex.Lock();

Review comment:
       In any case, I think it is a bug if this becomes a bottleneck, as generators should be tailored for expensive operations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10055: ARROW-12367: [C++] Stop producing when PushGenerator was destroyed

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10055:
URL: https://github.com/apache/arrow/pull/10055#discussion_r614267822



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -766,52 +766,77 @@ class PushGenerator {
   /// Producer API for PushGenerator
   class Producer {
    public:
-    explicit Producer(std::shared_ptr<State> state) : state_(std::move(state)) {}
+    explicit Producer(const std::shared_ptr<State> state) : weak_state_(state) {}
 
-    /// Push a value on the queue
-    void Push(Result<T> result) {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    /// \brief Push a value on the queue
+    ///
+    /// True is returned if the value was pushed, false if the generator is
+    /// already closed or destroyed.  If the latter, it is recommended to stop
+    /// producing any further values.
+    bool Push(Result<T> result) {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Closed early
-        return;
+        return false;
       }
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(std::move(result));
-        return;
+      } else {
+        state->result_q.push_back(std::move(result));
       }
-      state_->result_q.push_back(std::move(result));
+      return true;
     }
 
     /// \brief Tell the consumer we have finished producing
     ///
     /// It is allowed to call this and later call Push() again ("early close").
     /// In this case, calls to Push() after the queue is closed are silently
     /// ignored.  This can help implementing non-trivial cancellation cases.
-    void Close() {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    ///
+    /// True is returned on success, false if the generator is already closed
+    /// or destroyed.
+    bool Close() {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Already closed
-        return;
+        return false;
       }
-      state_->finished = true;
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      state->finished = true;
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(IterationTraits<T>::End());
       }
+      return true;
     }
 
+    /// Return whether the generator was closed or destroyed.
     bool is_closed() const {
-      auto lock = state_->mutex.Lock();
-      return state_->finished;
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return true;
+      }
+      auto lock = state->mutex.Lock();

Review comment:
       Looks like reading an atomic bool is free on x86, not on ARM64 though: https://godbolt.org/z/fjh91hE1c




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10055: ARROW-12367: [C++] Stop producing when PushGenerator was destroyed

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10055:
URL: https://github.com/apache/arrow/pull/10055#discussion_r614278652



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -766,52 +766,77 @@ class PushGenerator {
   /// Producer API for PushGenerator
   class Producer {
    public:
-    explicit Producer(std::shared_ptr<State> state) : state_(std::move(state)) {}
+    explicit Producer(const std::shared_ptr<State> state) : weak_state_(state) {}
 
-    /// Push a value on the queue
-    void Push(Result<T> result) {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    /// \brief Push a value on the queue
+    ///
+    /// True is returned if the value was pushed, false if the generator is
+    /// already closed or destroyed.  If the latter, it is recommended to stop
+    /// producing any further values.
+    bool Push(Result<T> result) {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Closed early
-        return;
+        return false;
       }
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(std::move(result));
-        return;
+      } else {
+        state->result_q.push_back(std::move(result));
       }
-      state_->result_q.push_back(std::move(result));
+      return true;
     }
 
     /// \brief Tell the consumer we have finished producing
     ///
     /// It is allowed to call this and later call Push() again ("early close").
     /// In this case, calls to Push() after the queue is closed are silently
     /// ignored.  This can help implementing non-trivial cancellation cases.
-    void Close() {
-      auto lock = state_->mutex.Lock();
-      if (state_->finished) {
+    ///
+    /// True is returned on success, false if the generator is already closed
+    /// or destroyed.
+    bool Close() {
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return false;
+      }
+      auto lock = state->mutex.Lock();
+      if (state->finished) {
         // Already closed
-        return;
+        return false;
       }
-      state_->finished = true;
-      if (state_->consumer_fut.has_value()) {
-        auto fut = std::move(state_->consumer_fut.value());
-        state_->consumer_fut.reset();
+      state->finished = true;
+      if (state->consumer_fut.has_value()) {
+        auto fut = std::move(state->consumer_fut.value());
+        state->consumer_fut.reset();
         lock.Unlock();  // unlock before potentially invoking a callback
         fut.MarkFinished(IterationTraits<T>::End());
       }
+      return true;
     }
 
+    /// Return whether the generator was closed or destroyed.
     bool is_closed() const {
-      auto lock = state_->mutex.Lock();
-      return state_->finished;
+      auto state = weak_state_.lock();
+      if (!state) {
+        // Generator was destroyed
+        return true;
+      }
+      auto lock = state->mutex.Lock();

Review comment:
       True.  And atomic bool may not even be free on x86 as it forces the compilers to sequence things (although so would a mutex and that sequencing is exactly what we're after here).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10055: ARROW-12367: [C++] Stop producing when PushGenerator was destroyed

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10055:
URL: https://github.com/apache/arrow/pull/10055


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org