You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/26 01:09:18 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #13143: ARROW-16523: [C++] Part 1 of ExecPlan cleanup: Centralized Task Group

westonpace commented on code in PR #13143:
URL: https://github.com/apache/arrow/pull/13143#discussion_r882219732


##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,18 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  size_t GetThreadIndex();

Review Comment:
   ```suggestion
     size_t GetThreadIndex();
   ```
   Minor nit: `size_t` feels like overkill for a thread index.  Maybe `int` or `uint32_t`?



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -157,6 +169,8 @@ class ARROW_EXPORT ExecNode {
   /// knows when it has received all input, regardless of order.
   virtual void InputFinished(ExecNode* input, int total_batches) = 0;
 
+  virtual Status Init() { return Status::OK(); }

Review Comment:
   ```suggestion
     /// \brief Validate configuration and prepare any initial state
     ///
     /// This will be called after the plan has been fully created and before any
     /// node starts producing.  At this point a node can use its inputs and outputs
     /// (and input schemas) to validate its configuration.
     virtual Status Init() { return Status::OK(); }
   ```



##########
cpp/src/arrow/compute/exec/aggregate_node.cc:
##########
@@ -508,47 +519,24 @@ class GroupByNode : public ExecNode {
     std::move(out_keys.values.begin(), out_keys.values.end(),
               out_data.values.begin() + agg_kernels_.size());
     state->grouper.reset();
-
-    if (output_counter_.SetTotal(
-            static_cast<int>(bit_util::CeilDiv(out_data.length, output_batch_size())))) {
-      // this will be hit if out_data.length == 0
-      finished_.MarkFinished();
-    }
     return out_data;
   }
 
-  void OutputNthBatch(int n) {
+  void OutputNthBatch(int64_t n) {
     // bail if StopProducing was called
     if (finished_.is_finished()) return;
 
     int64_t batch_size = output_batch_size();
     outputs_[0]->InputReceived(this, out_data_.Slice(batch_size * n, batch_size));
-
-    if (output_counter_.Increment()) {
-      finished_.MarkFinished();
-    }
   }
 
   Status OutputResult() {
     RETURN_NOT_OK(Merge());
     ARROW_ASSIGN_OR_RAISE(out_data_, Finalize());
 
-    int num_output_batches = *output_counter_.total();
-    outputs_[0]->InputFinished(this, num_output_batches);
-
-    auto executor = ctx_->executor();
-    for (int i = 0; i < num_output_batches; ++i) {
-      if (executor) {
-        // bail if StopProducing was called
-        if (finished_.is_finished()) break;
-
-        auto plan = this->plan()->shared_from_this();
-        RETURN_NOT_OK(executor->Spawn([plan, this, i] { OutputNthBatch(i); }));
-      } else {
-        OutputNthBatch(i);
-      }
-    }
-
+    int64_t num_output_batches = bit_util::CeilDiv(out_data_.length, output_batch_size());
+    outputs_[0]->InputFinished(this, static_cast<int>(num_output_batches));

Review Comment:
   This feels a little weird that we are calling `InputFinished` before we call any kind of `InputReceived` but I think it's correct.



##########
cpp/src/arrow/compute/exec/source_node.cc:
##########
@@ -96,64 +96,51 @@ struct SourceNode : ExecNode {
       options.executor = executor;
       options.should_schedule = ShouldSchedule::IfDifferentExecutor;
     }
-    finished_ = Loop([this, executor, options] {
-                  std::unique_lock<std::mutex> lock(mutex_);
-                  int total_batches = batch_count_++;
-                  if (stop_requested_) {
-                    return Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
-                  }
-                  lock.unlock();
-
-                  return generator_().Then(
-                      [=](const util::optional<ExecBatch>& maybe_batch)
-                          -> Future<ControlFlow<int>> {
-                        std::unique_lock<std::mutex> lock(mutex_);
-                        if (IsIterationEnd(maybe_batch) || stop_requested_) {
-                          stop_requested_ = true;
-                          return Break(total_batches);
-                        }
-                        lock.unlock();
-                        ExecBatch batch = std::move(*maybe_batch);
-
-                        if (executor) {
-                          auto status = task_group_.AddTask(
-                              [this, executor, batch]() -> Result<Future<>> {
-                                return executor->Submit([=]() {
-                                  outputs_[0]->InputReceived(this, std::move(batch));
-                                  return Status::OK();
-                                });
-                              });
-                          if (!status.ok()) {
-                            outputs_[0]->ErrorReceived(this, std::move(status));
-                            return Break(total_batches);
-                          }
-                        } else {
-                          outputs_[0]->InputReceived(this, std::move(batch));
-                        }
-                        lock.lock();
-                        if (!backpressure_future_.is_finished()) {
-                          EVENT(span_, "Source paused due to backpressure");
-                          return backpressure_future_.Then(
-                              []() -> ControlFlow<int> { return Continue(); });
-                        }
-                        return Future<ControlFlow<int>>::MakeFinished(Continue());
-                      },
-                      [=](const Status& error) -> ControlFlow<int> {
-                        // NB: ErrorReceived is independent of InputFinished, but
-                        // ErrorReceived will usually prompt StopProducing which will
-                        // prompt InputFinished. ErrorReceived may still be called from a
-                        // node which was requested to stop (indeed, the request to stop
-                        // may prompt an error).
-                        std::unique_lock<std::mutex> lock(mutex_);
-                        stop_requested_ = true;
-                        lock.unlock();
-                        outputs_[0]->ErrorReceived(this, error);
-                        return Break(total_batches);
-                      },
-                      options);
-                }).Then([&](int total_batches) {
-      outputs_[0]->InputFinished(this, total_batches);
-      return task_group_.End();
+    Loop([this, options] {
+      std::unique_lock<std::mutex> lock(mutex_);
+      int total_batches = batch_count_++;
+      if (stop_requested_) {
+        return Future<ControlFlow<int>>::MakeFinished(Break(total_batches));
+      }
+      lock.unlock();
+
+      return generator_().Then(
+          [=](const util::optional<ExecBatch>& maybe_batch) -> Future<ControlFlow<int>> {
+            std::unique_lock<std::mutex> lock(mutex_);
+            if (IsIterationEnd(maybe_batch) || stop_requested_) {
+              stop_requested_ = true;
+              return Break(total_batches);
+            }
+            lock.unlock();
+            ExecBatch batch = std::move(*maybe_batch);
+            RETURN_NOT_OK(plan_->ScheduleTask([=]() {
+              outputs_[0]->InputReceived(this, std::move(batch));
+              return Status::OK();
+            }));
+            lock.lock();
+            if (!backpressure_future_.is_finished()) {
+              EVENT(span_, "Source paused due to backpressure");
+              return backpressure_future_.Then(
+                  []() -> ControlFlow<int> { return Continue(); });
+            }
+            return Future<ControlFlow<int>>::MakeFinished(Continue());
+          },
+          [=](const Status& error) -> ControlFlow<int> {
+            // NB: ErrorReceived is independent of InputFinished, but
+            // ErrorReceived will usually prompt StopProducing which will
+            // prompt InputFinished. ErrorReceived may still be called from a
+            // node which was requested to stop (indeed, the request to stop
+            // may prompt an error).

Review Comment:
   I'm not sure how much this comment makes sense anymore (not really related to your change).  Since an error prompts us to return `Break(total_batches)` we will immediately be calling `outputs_[0]->InputFinished` in the callback.  So I don't think we are relying on `StopProducing` here (`StopProducing`, at this point, is just a signal that the user has requested an abort of the plan, and not something nodes send to each other  `StopProducing` would never trigger a call to `InputFinished`).



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -279,7 +293,7 @@ class ARROW_EXPORT ExecNode {
   NodeVector outputs_;
 
   // Future to sync finished
-  Future<> finished_ = Future<>::MakeFinished();
+  Future<> finished_ = Future<>::Make();

Review Comment:
   Ideally this goes away at some point right?  If all work is being done by a plan scheduler then it seems a node shouldn't really need a concept of "finished"



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,18 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  size_t GetThreadIndex();
+  size_t thread_capacity() const;

Review Comment:
   ```suggestion
     size_t max_concurrency() const;
   ```
   The old name is probably fine too if you favor it.  When I see `thread_capacity` my brain thinks "the capacity of a thread".  Either way we should add a brief doc comment here explaining what this is (e.g. does it include I/O threads?  Is it safe to make an array of thread locals based on this?  Is this the max value that could be returned by `GetThreadIndex`?)



##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -237,14 +290,18 @@ struct ExecPlanImpl : public ExecPlan {
     return ss.str();
   }
 
-  Future<> finished_ = Future<>::MakeFinished();
+  Future<> finished_ = Future<>::Make();

Review Comment:
   ```suggestion
     Future<> finished_;
   ```
   
   Minor nit, this will save on the initialization at the expense of calls to `plan->finished()` failing if the plan hasn't been started (actually, we might want calls like this to fail to avoid the possibility of someone calling `plan->finished()`, abandoning the plan, and then being stuck with a future that never finishes.)



##########
cpp/src/arrow/compute/exec/exec_plan.h:
##########
@@ -61,6 +61,18 @@ class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
     return out;
   }
 
+  size_t GetThreadIndex();
+  size_t thread_capacity() const;
+
+  Status AddFuture(Future<> fut);
+  Status ScheduleTask(std::function<Status()> fn);
+  Status ScheduleTask(std::function<Status(size_t)> fn);
+  // The need to register a task group before use will be removed after we rewrite the
+  // scheduler.
+  int RegisterTaskGroup(std::function<Status(size_t, int64_t)> task,
+                        std::function<Status(size_t)> on_finished);
+  Status StartTaskGroup(int task_group_id, int64_t num_tasks);
+

Review Comment:
   We should document these methods and also provide some guidelines on when they should be used, even if some of this is eventually headed to the scheduler in a new rework.  As a user writing an ExecNode I would have various questions:
   
   When should I schedule a task vs just doing the work?
    - Right now this is basically for pipeline breakers right?
   When should I use ScheduleTask vs. RegisterTaskGroup/StartTaskGroup?
    - I'm not sure the guidance here.  I think task group is for when you have enough work that it makes sense to break up the work across many different threads.
   When should I use AddFuture?
    - Only if you have to because you are interfacing with something that generates futures, e.g. I/O.  Not preferred for new work.
    
    Make sure to document that `RegisterTaskGroup` must be called during `Init`.



##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -237,14 +290,18 @@ struct ExecPlanImpl : public ExecPlan {
     return ss.str();
   }
 
-  Future<> finished_ = Future<>::MakeFinished();
+  Future<> finished_ = Future<>::Make();
   bool started_ = false, stopped_ = false;
   std::vector<std::unique_ptr<ExecNode>> nodes_;
   NodeVector sources_, sinks_;
   NodeVector sorted_nodes_;
   uint32_t auto_label_counter_ = 0;
   util::tracing::Span span_;
   std::shared_ptr<const KeyValueMetadata> metadata_;
+
+  ThreadIndexer thread_indexer_;
+  util::AsyncTaskGroup task_group_;
+  std::unique_ptr<TaskScheduler> task_scheduler_;

Review Comment:
   ```suggestion
     std::unique_ptr<TaskScheduler> task_scheduler_ = TaskScheduler::Make();
   ```
   Let's initialize this here for consistency with other fields (e.g. `auto_label_counter_`, `finished_`, `started_`, ...)



##########
cpp/src/arrow/compute/exec/hash_join.cc:
##########
@@ -883,6 +875,8 @@ class HashJoinBasicImpl : public HashJoinImpl {
   //
   OutputBatchCallback output_batch_callback_;
   FinishedCallback finished_callback_;
+  RegisterTaskGroupCallback register_task_group_callback_;

Review Comment:
   This feels a little ping-pongy.  Did you mention the other day that the task groups might be moving into the node itself?  I don't think this needs to change, just calling it out as food for thought.
   
   If these tasks aren't moving out then maybe we can define some basic pure virtual class above `ExecNode` like `PlanContext` which has the scheduler, thread id accessor, etc. that we can allow types like this to depend on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org