You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/06/29 13:56:10 UTC

[arrow-datafusion] branch master updated: Ballista: Rename QueryStageExec to ShuffleWriterExec (#633)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 23b7898  Ballista: Rename QueryStageExec to ShuffleWriterExec (#633)
23b7898 is described below

commit 23b7898316daf25f94db77f37ea52333c4e8f90f
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue Jun 29 07:56:03 2021 -0600

    Ballista: Rename QueryStageExec to ShuffleWriterExec (#633)
---
 ballista/rust/core/src/execution_plans/mod.rs      |  4 +--
 .../core/src/execution_plans/shuffle_reader.rs     |  2 +-
 .../{query_stage.rs => shuffle_writer.rs}          | 30 +++++++++++-----------
 .../core/src/execution_plans/unresolved_shuffle.rs |  4 +--
 ballista/rust/core/src/utils.rs                    |  8 +++---
 ballista/rust/executor/src/executor.rs             | 11 +++++---
 ballista/rust/scheduler/src/planner.rs             | 20 +++++++--------
 7 files changed, 42 insertions(+), 37 deletions(-)

diff --git a/ballista/rust/core/src/execution_plans/mod.rs b/ballista/rust/core/src/execution_plans/mod.rs
index 1fb2010..ca4e600 100644
--- a/ballista/rust/core/src/execution_plans/mod.rs
+++ b/ballista/rust/core/src/execution_plans/mod.rs
@@ -18,10 +18,10 @@
 //! This module contains execution plans that are needed to distribute Datafusion's execution plans into
 //! several Ballista executors.
 
-mod query_stage;
 mod shuffle_reader;
+mod shuffle_writer;
 mod unresolved_shuffle;
 
-pub use query_stage::QueryStageExec;
 pub use shuffle_reader::ShuffleReaderExec;
+pub use shuffle_writer::ShuffleWriterExec;
 pub use unresolved_shuffle::UnresolvedShuffleExec;
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 3a7f795..9ab0641 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -36,7 +36,7 @@ use datafusion::{
 use futures::{future, Stream, StreamExt};
 use log::info;
 
-/// ShuffleReaderExec reads partitions that have already been materialized by a query stage
+/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
 /// being executed by an executor
 #[derive(Debug, Clone)]
 pub struct ShuffleReaderExec {
diff --git a/ballista/rust/core/src/execution_plans/query_stage.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
similarity index 94%
rename from ballista/rust/core/src/execution_plans/query_stage.rs
rename to ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 1e91540..2d8d783 100644
--- a/ballista/rust/core/src/execution_plans/query_stage.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! QueryStageExec represents a section of a query plan that has consistent partitioning and
-//! can be executed as one unit with each partition being executed in parallel. The output of
-//! a query stage either forms the input of another query stage or can be the final result of
-//! a query.
+//! ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
+//! can be executed as one unit with each partition being executed in parallel. The output of each
+//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
+//! will use the ShuffleReaderExec to read these results.
 
 use std::iter::Iterator;
 use std::path::PathBuf;
@@ -48,12 +48,12 @@ use log::info;
 use std::fs::File;
 use uuid::Uuid;
 
-/// QueryStageExec represents a section of a query plan that has consistent partitioning and
-/// can be executed as one unit with each partition being executed in parallel. The output of
-/// a query stage either forms the input of another query stage or can be the final result of
-/// a query.
+/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
+/// can be executed as one unit with each partition being executed in parallel. The output of each
+/// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
+/// will use the ShuffleReaderExec to read these results.
 #[derive(Debug, Clone)]
-pub struct QueryStageExec {
+pub struct ShuffleWriterExec {
     /// Unique ID for the job (query) that this stage is a part of
     job_id: String,
     /// Unique query stage ID within the job
@@ -66,8 +66,8 @@ pub struct QueryStageExec {
     shuffle_output_partitioning: Option<Partitioning>,
 }
 
-impl QueryStageExec {
-    /// Create a new query stage
+impl ShuffleWriterExec {
+    /// Create a new shuffle writer
     pub fn try_new(
         job_id: String,
         stage_id: usize,
@@ -96,7 +96,7 @@ impl QueryStageExec {
 }
 
 #[async_trait]
-impl ExecutionPlan for QueryStageExec {
+impl ExecutionPlan for ShuffleWriterExec {
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -118,7 +118,7 @@ impl ExecutionPlan for QueryStageExec {
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         assert!(children.len() == 1);
-        Ok(Arc::new(QueryStageExec::try_new(
+        Ok(Arc::new(ShuffleWriterExec::try_new(
             self.job_id.clone(),
             self.stage_id,
             children[0].clone(),
@@ -379,7 +379,7 @@ mod tests {
     async fn test() -> Result<()> {
         let input_plan = create_input_plan()?;
         let work_dir = TempDir::new()?;
-        let query_stage = QueryStageExec::try_new(
+        let query_stage = ShuffleWriterExec::try_new(
             "jobOne".to_owned(),
             1,
             input_plan,
@@ -418,7 +418,7 @@ mod tests {
     async fn test_partitioned() -> Result<()> {
         let input_plan = create_input_plan()?;
         let work_dir = TempDir::new()?;
-        let query_stage = QueryStageExec::try_new(
+        let query_stage = ShuffleWriterExec::try_new(
             "jobOne".to_owned(),
             1,
             input_plan,
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 5c1b417..9c53bc7 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -30,10 +30,10 @@ use datafusion::{
 };
 use log::info;
 
-/// UnresolvedShuffleExec represents a dependency on the results of several QueryStageExec nodes which haven't been computed yet.
+/// UnresolvedShuffleExec represents a dependency on the results of several ShuffleWriterExec nodes which haven't been computed yet.
 ///
 /// An ExecutionPlan that contains an UnresolvedShuffleExec isn't ready for execution. The presence of this ExecutionPlan
-/// is used as a signal so the scheduler knows it can't start computation on a specific QueryStageExec.
+/// is used as a signal so the scheduler knows it can't start computation on a specific ShuffleWriterExec.
 #[derive(Debug, Clone)]
 pub struct UnresolvedShuffleExec {
     // The query stage ids which needs to be computed
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 26bdb00..d043763 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
 use std::{fs::File, pin::Pin};
 
 use crate::error::{BallistaError, Result};
-use crate::execution_plans::{QueryStageExec, UnresolvedShuffleExec};
+use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
 use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::PartitionStats;
 
@@ -106,7 +106,7 @@ pub async fn collect_stream(
     Ok(batches)
 }
 
-pub fn produce_diagram(filename: &str, stages: &[Arc<QueryStageExec>]) -> Result<()> {
+pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) -> Result<()> {
     let write_file = File::create(filename)?;
     let mut w = BufWriter::new(&write_file);
     writeln!(w, "digraph G {{")?;
@@ -163,8 +163,8 @@ fn build_exec_plan_diagram(
         "CsvExec"
     } else if plan.as_any().downcast_ref::<FilterExec>().is_some() {
         "FilterExec"
-    } else if plan.as_any().downcast_ref::<QueryStageExec>().is_some() {
-        "QueryStageExec"
+    } else if plan.as_any().downcast_ref::<ShuffleWriterExec>().is_some() {
+        "ShuffleWriterExec"
     } else if plan
         .as_any()
         .downcast_ref::<UnresolvedShuffleExec>()
diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs
index 90c3927..86aaa7e 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -20,7 +20,7 @@
 use std::sync::Arc;
 
 use ballista_core::error::BallistaError;
-use ballista_core::execution_plans::QueryStageExec;
+use ballista_core::execution_plans::ShuffleWriterExec;
 use ballista_core::utils;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::physical_plan::ExecutionPlan;
@@ -51,8 +51,13 @@ impl Executor {
         part: usize,
         plan: Arc<dyn ExecutionPlan>,
     ) -> Result<RecordBatch, BallistaError> {
-        let exec =
-            QueryStageExec::try_new(job_id, stage_id, plan, self.work_dir.clone(), None)?;
+        let exec = ShuffleWriterExec::try_new(
+            job_id,
+            stage_id,
+            plan,
+            self.work_dir.clone(),
+            None,
+        )?;
         let mut stream = exec.execute(part).await?;
         let batches = utils::collect_stream(&mut stream).await?;
         // the output should be a single batch containing metadata (path and statistics)
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index 32fc9a9..70d90a4 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -25,7 +25,7 @@ use std::sync::Arc;
 use ballista_core::datasource::DfTableAdapter;
 use ballista_core::error::{BallistaError, Result};
 use ballista_core::{
-    execution_plans::{QueryStageExec, ShuffleReaderExec, UnresolvedShuffleExec},
+    execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
     serde::scheduler::PartitionLocation,
 };
 use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
@@ -39,7 +39,7 @@ use datafusion::physical_plan::windows::WindowAggExec;
 use datafusion::physical_plan::ExecutionPlan;
 use log::info;
 
-type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<QueryStageExec>>);
+type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
 
 pub struct DistributedPlanner {
     next_stage_id: usize,
@@ -58,16 +58,16 @@ impl Default for DistributedPlanner {
 }
 
 impl DistributedPlanner {
-    /// Returns a vector of ExecutionPlans, where the root node is a [QueryStageExec].
+    /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
     /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
-    /// A [QueryStageExec] is created whenever the partitioning changes.
+    /// A [ShuffleWriterExec] is created whenever the partitioning changes.
     ///
     /// Returns an empty vector if the execution_plan doesn't need to be sliced into several stages.
     pub fn plan_query_stages(
         &mut self,
         job_id: &str,
         execution_plan: Arc<dyn ExecutionPlan>,
-    ) -> Result<Vec<Arc<QueryStageExec>>> {
+    ) -> Result<Vec<Arc<ShuffleWriterExec>>> {
         info!("planning query stages");
         let (new_plan, mut stages) =
             self.plan_query_stages_internal(job_id, execution_plan)?;
@@ -228,8 +228,8 @@ fn create_query_stage(
     job_id: &str,
     stage_id: usize,
     plan: Arc<dyn ExecutionPlan>,
-) -> Result<Arc<QueryStageExec>> {
-    Ok(Arc::new(QueryStageExec::try_new(
+) -> Result<Arc<ShuffleWriterExec>> {
+    Ok(Arc::new(ShuffleWriterExec::try_new(
         job_id.to_owned(),
         stage_id,
         plan,
@@ -285,13 +285,13 @@ mod test {
         }
 
         /* Expected result:
-        QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1
+        ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1
          HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
           CsvExec: testdata/lineitem; partitions=2
-        QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
+        ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
          CoalescePartitionsExec
           UnresolvedShuffleExec: stages=[1]
-        QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
+        ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
          SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
           ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip
            HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]