You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/26 13:11:32 UTC

[arrow-ballista] branch master updated: Add shuffle for SortPreservingMergeExec physical operator (#16) (#452)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new f9a4d374 Add shuffle for SortPreservingMergeExec physical operator (#16) (#452)
f9a4d374 is described below

commit f9a4d374afc12b9376b9adfc6904fbaa16cacf39
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Oct 26 21:11:27 2022 +0800

    Add shuffle for SortPreservingMergeExec physical operator (#16) (#452)
---
 ballista/scheduler/src/planner.rs | 27 +++++++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs
index 8d6ec736..c91edd34 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -29,6 +29,7 @@ use ballista_core::{
 };
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use datafusion::physical_plan::windows::WindowAggExec;
 use datafusion::physical_plan::{
     with_new_children_if_necessary, ExecutionPlan, Partitioning,
@@ -124,6 +125,32 @@ impl DistributedPlanner {
                 with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
                 stages,
             ))
+        } else if let Some(_sort_preserving_merge) = execution_plan
+            .as_any()
+            .downcast_ref::<SortPreservingMergeExec>(
+        ) {
+            let shuffle_writer = create_shuffle_writer(
+                job_id,
+                self.next_stage_id(),
+                children[0].clone(),
+                None,
+            )?;
+            let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
+                shuffle_writer.stage_id(),
+                shuffle_writer.schema(),
+                shuffle_writer.output_partitioning().partition_count(),
+                shuffle_writer
+                    .shuffle_output_partitioning()
+                    .map(|p| p.partition_count())
+                    .unwrap_or_else(|| {
+                        shuffle_writer.output_partitioning().partition_count()
+                    }),
+            ));
+            stages.push(shuffle_writer);
+            Ok((
+                with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
+                stages,
+            ))
         } else if let Some(repart) =
             execution_plan.as_any().downcast_ref::<RepartitionExec>()
         {