You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/18 11:32:34 UTC

[GitHub] [arrow-datafusion] thinkharderdev commented on a change in pull request #1842: [Ballista] Streaming style push-based shuffle and All-at-once stage scheduling in Ballista

thinkharderdev commented on a change in pull request #1842:
URL: https://github.com/apache/arrow-datafusion/pull/1842#discussion_r809912822



##########
File path: ballista/rust/scheduler/src/planner.rs
##########
@@ -55,24 +56,41 @@ impl Default for DistributedPlanner {
 
 impl DistributedPlanner {
     /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
-    /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
+    /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec]
+    /// or of type [ShuffleStreamReaderExec] if the created stages are all-at-once stages.
     /// A [ShuffleWriterExec] is created whenever the partitioning changes.
     pub async fn plan_query_stages<'a>(
         &'a mut self,
         job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
     ) -> Result<Vec<Arc<ShuffleWriterExec>>> {
-        info!("planning query stages");
-        let (new_plan, mut stages) = self
-            .plan_query_stages_internal(job_id, execution_plan)
+        info!("planning query stages for job {}", job_id);
+        let (modified_plan, mut stages) = self
+            .plan_query_stages_internal(job_id, execution_plan.clone())
             .await?;
-        stages.push(create_shuffle_writer(
-            job_id,
-            self.next_stage_id(),
-            new_plan,
-            None,
-        )?);
-        Ok(stages)
+        // re-plan the input execution plan and create All-at-once query stages.
+        // Now we just simply depends on the the stage count to decide whether to create All-at-once or normal stages.
+        // In future, we can have more sophisticated way to decide which way to go.
+        if stages.len() > 1 && stages.len() <= 4 {

Review comment:
       If I understand the original design correctly, the "all-at-once" plan will only get scheduled when there are sufficient task slots available to run the entire plan. So should this be a function of the total number of partitions? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org