You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/27 18:29:24 UTC

[GitHub] [arrow] save-buffer commented on a diff in pull request #14524: ARROW-17509: [C++] Simplify async scheduler by removing the need to call End

save-buffer commented on code in PR #14524:
URL: https://github.com/apache/arrow/pull/14524#discussion_r1007212828


##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -127,85 +127,89 @@ struct ExecPlanImpl : public ExecPlan {
   }
 
   Status StartProducing() {
-    START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
-#ifdef ARROW_WITH_OPENTELEMETRY
-    if (HasMetadata()) {
-      auto pairs = metadata().get()->sorted_pairs();
-      opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span =
-          ::arrow::internal::tracing::UnwrapSpan(span_.details.get());
-      std::for_each(std::begin(pairs), std::end(pairs),
-                    [span](std::pair<std::string, std::string> const& pair) {
-                      span->SetAttribute(pair.first, pair.second);
-                    });
-    }
-#endif
     if (started_) {
       return Status::Invalid("restarted ExecPlan");
     }
-
-    std::vector<Future<>> futures;
-    for (auto& n : nodes_) {
-      RETURN_NOT_OK(n->Init());
-      futures.push_back(n->finished());
-    }
-
-    AllFinished(futures).AddCallback([this](const Status& st) {
-      error_st_ = st;
-      EndTaskGroup();
-    });
-
-    task_scheduler_->RegisterEnd();
-    int num_threads = 1;
-    bool sync_execution = true;
-    if (auto executor = exec_context()->executor()) {
-      num_threads = executor->GetCapacity();
-      sync_execution = false;
-    }
-    RETURN_NOT_OK(task_scheduler_->StartScheduling(
-        0 /* thread_index */,
-        [this](std::function<Status(size_t)> fn) -> Status {
-          return this->ScheduleTask(std::move(fn));
-        },
-        /*concurrent_tasks=*/2 * num_threads, sync_execution));
-
     started_ = true;
-    // producers precede consumers
-    sorted_nodes_ = TopoSort();
-
-    Status st = Status::OK();
-
-    using rev_it = std::reverse_iterator<NodeVector::iterator>;
-    for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it != end; ++it) {
-      auto node = *it;
 
-      EVENT(span_, "StartProducing:" + node->label(),
-            {{"node.label", node->label()}, {"node.kind_name", node->kind_name()}});
-      st = node->StartProducing();
-      EVENT(span_, "StartProducing:" + node->label(), {{"status", st.ToString()}});
-      if (!st.ok()) {
-        // Stop nodes that successfully started, in reverse order
-        stopped_ = true;
-        StopProducingImpl(it.base(), sorted_nodes_.end());
-        for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it != it.base();
-             ++fw_it) {
-          Future<> fut = (*fw_it)->finished();
-          if (!fut.is_finished()) fut.MarkFinished();
-        }
-        return st;
-      }
-    }
-    return st;
-  }
-
-  void EndTaskGroup() {
-    bool expected = false;
-    if (group_ended_.compare_exchange_strong(expected, true)) {
-      async_scheduler_->End();
-      async_scheduler_->OnFinished().AddCallback([this](const Status& st) {
-        MARK_SPAN(span_, error_st_ & st);
-        END_SPAN(span_);
-        finished_.MarkFinished(error_st_ & st);
-      });
+    // We call StartProducing on each of the nodes.  The source nodes should generally
+    // start scheduling some tasks during this call.
+    //
+    // If no source node schedules any tasks (e.g. they do all their word synchronously as
+    // part of StartProducing) then the plan may be finished before we return from this
+    // call.
+    Future<> scheduler_finished =
+        util::AsyncTaskScheduler::Make([this](util::AsyncTaskScheduler* async_scheduler) {
+          this->async_scheduler_ = async_scheduler;
+          START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
+#ifdef ARROW_WITH_OPENTELEMETRY
+          if (HasMetadata()) {
+            auto pairs = metadata().get()->sorted_pairs();
+            opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span =
+                ::arrow::internal::tracing::UnwrapSpan(span_.details.get());
+            std::for_each(std::begin(pairs), std::end(pairs),
+                          [span](std::pair<std::string, std::string> const& pair) {
+                            span->SetAttribute(pair.first, pair.second);
+                          });
+          }
+#endif
+          // TODO(weston) The entire concept of ExecNode::finished() will hopefully go

Review Comment:
   yep my simplification PR gets rid of those finished futures.



##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -127,85 +127,89 @@ struct ExecPlanImpl : public ExecPlan {
   }
 
   Status StartProducing() {
-    START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}});
-#ifdef ARROW_WITH_OPENTELEMETRY
-    if (HasMetadata()) {
-      auto pairs = metadata().get()->sorted_pairs();
-      opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span =
-          ::arrow::internal::tracing::UnwrapSpan(span_.details.get());
-      std::for_each(std::begin(pairs), std::end(pairs),
-                    [span](std::pair<std::string, std::string> const& pair) {
-                      span->SetAttribute(pair.first, pair.second);
-                    });
-    }
-#endif
     if (started_) {
       return Status::Invalid("restarted ExecPlan");
     }
-
-    std::vector<Future<>> futures;
-    for (auto& n : nodes_) {
-      RETURN_NOT_OK(n->Init());
-      futures.push_back(n->finished());
-    }
-
-    AllFinished(futures).AddCallback([this](const Status& st) {
-      error_st_ = st;
-      EndTaskGroup();
-    });
-
-    task_scheduler_->RegisterEnd();
-    int num_threads = 1;
-    bool sync_execution = true;
-    if (auto executor = exec_context()->executor()) {
-      num_threads = executor->GetCapacity();
-      sync_execution = false;
-    }
-    RETURN_NOT_OK(task_scheduler_->StartScheduling(
-        0 /* thread_index */,
-        [this](std::function<Status(size_t)> fn) -> Status {
-          return this->ScheduleTask(std::move(fn));
-        },
-        /*concurrent_tasks=*/2 * num_threads, sync_execution));
-
     started_ = true;
-    // producers precede consumers
-    sorted_nodes_ = TopoSort();
-
-    Status st = Status::OK();
-
-    using rev_it = std::reverse_iterator<NodeVector::iterator>;
-    for (rev_it it(sorted_nodes_.end()), end(sorted_nodes_.begin()); it != end; ++it) {
-      auto node = *it;
 
-      EVENT(span_, "StartProducing:" + node->label(),
-            {{"node.label", node->label()}, {"node.kind_name", node->kind_name()}});
-      st = node->StartProducing();
-      EVENT(span_, "StartProducing:" + node->label(), {{"status", st.ToString()}});
-      if (!st.ok()) {
-        // Stop nodes that successfully started, in reverse order
-        stopped_ = true;
-        StopProducingImpl(it.base(), sorted_nodes_.end());
-        for (NodeVector::iterator fw_it = sorted_nodes_.begin(); fw_it != it.base();
-             ++fw_it) {
-          Future<> fut = (*fw_it)->finished();
-          if (!fut.is_finished()) fut.MarkFinished();
-        }
-        return st;
-      }
-    }
-    return st;
-  }
-
-  void EndTaskGroup() {
-    bool expected = false;
-    if (group_ended_.compare_exchange_strong(expected, true)) {
-      async_scheduler_->End();
-      async_scheduler_->OnFinished().AddCallback([this](const Status& st) {
-        MARK_SPAN(span_, error_st_ & st);
-        END_SPAN(span_);
-        finished_.MarkFinished(error_st_ & st);
-      });
+    // We call StartProducing on each of the nodes.  The source nodes should generally
+    // start scheduling some tasks during this call.
+    //
+    // If no source node schedules any tasks (e.g. they do all their word synchronously as
+    // part of StartProducing) then the plan may be finished before we return from this
+    // call.
+    Future<> scheduler_finished =
+        util::AsyncTaskScheduler::Make([this](util::AsyncTaskScheduler* async_scheduler) {

Review Comment:
   Could we move this giant lambda into its own function? The extra layer of indentation is pretty confusing 



##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -108,30 +108,37 @@ class AlreadyFailedScheduler : public AsyncTaskScheduler {
     std::ignore = std::move(finish_callback_)(failure_reason_);
   }
   bool AddTask(std::unique_ptr<Task> task) override { return false; }
-  void End() override {
-    Status::UnknownError("Do not call End on a sub-scheduler.").Abort();
-  }
-  Future<> OnFinished() const override {
-    Status::UnknownError(
-        "You should not rely on sub-scheduler's OnFinished.  Use a "
-        "finished callback when creating the sub-scheduler instead")
-        .Abort();
-  }
-  std::shared_ptr<AsyncTaskScheduler> MakeSubScheduler(
+  std::unique_ptr<AsyncTaskScheduler::Holder> MakeSubScheduler(
+      FnOnce<Status(AsyncTaskScheduler*)> initial_task,

Review Comment:
   Could we guarantee that this `initial_task` gets run synchronously? I get why it's needed (the scheduler will end itself immediately after creation without it), but I would like the control flow for scheduling the initial tasks to be simple. We could make this function return a `Result<unique_ptr>` in case a node has a problem scheduling its initial tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org