You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/08 08:13:15 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #9941: ARROW-12220: [C++][CI] Thread sanitizer failure

pitrou commented on a change in pull request #9941:
URL: https://github.com/apache/arrow/pull/9941#discussion_r609432471



##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1195,36 +1199,71 @@ class BackgroundGenerator {
             ClearQueue();
             queue.push(spawn_status);
           }
+          task_finished.MarkFinished();
         }
       }
     }
 
     internal::Executor* io_executor;
     Iterator<T> it;
-    bool running;
+    // True if we are still running in the loop and will be adding more items to the
+    // queue, don't restart the task if this is true.  However, even if this is false we
+    // might still be running some finish callbacks or marking the finish future.
+    bool running_in_loop;
+    // If this is false then the background thread is done with everything.  It will not
+    // be running any additional callbacks or marking the finish future.  There is no need
+    // to wait for it when cleaning up.
+    bool running_at_all;
     bool finished;

Review comment:
       This is where complexity starts being difficult to reason about. What's the difference between `running_at_all` and `finished`? What are the possible combinations?
   You may want to define a enum to describe the current state rather than having three different booleans...

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1247,10 +1286,23 @@ class BackgroundGenerator {
           waiting_future.MarkFinished(next);
         }
       }
+      {
+        auto guard = state->mutex.Lock();
+        state->running_at_all = false;
+        if (state->finished) {
+          state->task_finished.MarkFinished();
+        }
+      }
     }
   };
 
   std::shared_ptr<State> state_;
+  // state_ is held by both the generator and the background thread so it won't be cleaned
+  // up when all consumer references are relinquished.  cleanup_ is only held by the
+  // generator so it will be destructed when the last consumer reference is gone.  We use
+  // this to cleanup / stop the background generator in case the consuming end stops
+  // listening (e.g. due to a downstream error)
+  std::shared_ptr<Cleanup> cleanup_;

Review comment:
       Would it be possible to simply use a `std::weak_ptr<State>` in the background thread?
   (if there is some cleanup to do, you may do it in the State destructor?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org