You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/26 13:11:32 UTC
[arrow-ballista] branch master updated: Add shuffle for SortPreservingMergeExec physical operator (#16) (#452)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new f9a4d374 Add shuffle for SortPreservingMergeExec physical operator (#16) (#452)
f9a4d374 is described below
commit f9a4d374afc12b9376b9adfc6904fbaa16cacf39
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Wed Oct 26 21:11:27 2022 +0800
Add shuffle for SortPreservingMergeExec physical operator (#16) (#452)
---
ballista/scheduler/src/planner.rs | 27 +++++++++++++++++++++++++++
1 file changed, 27 insertions(+)
diff --git a/ballista/scheduler/src/planner.rs b/ballista/scheduler/src/planner.rs
index 8d6ec736..c91edd34 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -29,6 +29,7 @@ use ballista_core::{
};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
+use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{
with_new_children_if_necessary, ExecutionPlan, Partitioning,
@@ -124,6 +125,32 @@ impl DistributedPlanner {
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
+ } else if let Some(_sort_preserving_merge) = execution_plan
+ .as_any()
+ .downcast_ref::<SortPreservingMergeExec>(
+ ) {
+ let shuffle_writer = create_shuffle_writer(
+ job_id,
+ self.next_stage_id(),
+ children[0].clone(),
+ None,
+ )?;
+ let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
+ shuffle_writer.stage_id(),
+ shuffle_writer.schema(),
+ shuffle_writer.output_partitioning().partition_count(),
+ shuffle_writer
+ .shuffle_output_partitioning()
+ .map(|p| p.partition_count())
+ .unwrap_or_else(|| {
+ shuffle_writer.output_partitioning().partition_count()
+ }),
+ ));
+ stages.push(shuffle_writer);
+ Ok((
+ with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
+ stages,
+ ))
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{