You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/06/29 13:56:10 UTC
[arrow-datafusion] branch master updated: Ballista: Rename
QueryStageExec to ShuffleWriterExec (#633)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 23b7898 Ballista: Rename QueryStageExec to ShuffleWriterExec (#633)
23b7898 is described below
commit 23b7898316daf25f94db77f37ea52333c4e8f90f
Author: Andy Grove <an...@gmail.com>
AuthorDate: Tue Jun 29 07:56:03 2021 -0600
Ballista: Rename QueryStageExec to ShuffleWriterExec (#633)
---
ballista/rust/core/src/execution_plans/mod.rs | 4 +--
.../core/src/execution_plans/shuffle_reader.rs | 2 +-
.../{query_stage.rs => shuffle_writer.rs} | 30 +++++++++++-----------
.../core/src/execution_plans/unresolved_shuffle.rs | 4 +--
ballista/rust/core/src/utils.rs | 8 +++---
ballista/rust/executor/src/executor.rs | 11 +++++---
ballista/rust/scheduler/src/planner.rs | 20 +++++++--------
7 files changed, 42 insertions(+), 37 deletions(-)
diff --git a/ballista/rust/core/src/execution_plans/mod.rs b/ballista/rust/core/src/execution_plans/mod.rs
index 1fb2010..ca4e600 100644
--- a/ballista/rust/core/src/execution_plans/mod.rs
+++ b/ballista/rust/core/src/execution_plans/mod.rs
@@ -18,10 +18,10 @@
//! This module contains execution plans that are needed to distribute Datafusion's execution plans into
//! several Ballista executors.
-mod query_stage;
mod shuffle_reader;
+mod shuffle_writer;
mod unresolved_shuffle;
-pub use query_stage::QueryStageExec;
pub use shuffle_reader::ShuffleReaderExec;
+pub use shuffle_writer::ShuffleWriterExec;
pub use unresolved_shuffle::UnresolvedShuffleExec;
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 3a7f795..9ab0641 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -36,7 +36,7 @@ use datafusion::{
use futures::{future, Stream, StreamExt};
use log::info;
-/// ShuffleReaderExec reads partitions that have already been materialized by a query stage
+/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
/// being executed by an executor
#[derive(Debug, Clone)]
pub struct ShuffleReaderExec {
diff --git a/ballista/rust/core/src/execution_plans/query_stage.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
similarity index 94%
rename from ballista/rust/core/src/execution_plans/query_stage.rs
rename to ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 1e91540..2d8d783 100644
--- a/ballista/rust/core/src/execution_plans/query_stage.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-//! QueryStageExec represents a section of a query plan that has consistent partitioning and
-//! can be executed as one unit with each partition being executed in parallel. The output of
-//! a query stage either forms the input of another query stage or can be the final result of
-//! a query.
+//! ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
+//! can be executed as one unit with each partition being executed in parallel. The output of each
+//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
+//! will use the ShuffleReaderExec to read these results.
use std::iter::Iterator;
use std::path::PathBuf;
@@ -48,12 +48,12 @@ use log::info;
use std::fs::File;
use uuid::Uuid;
-/// QueryStageExec represents a section of a query plan that has consistent partitioning and
-/// can be executed as one unit with each partition being executed in parallel. The output of
-/// a query stage either forms the input of another query stage or can be the final result of
-/// a query.
+/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
+/// can be executed as one unit with each partition being executed in parallel. The output of each
+/// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
+/// will use the ShuffleReaderExec to read these results.
#[derive(Debug, Clone)]
-pub struct QueryStageExec {
+pub struct ShuffleWriterExec {
/// Unique ID for the job (query) that this stage is a part of
job_id: String,
/// Unique query stage ID within the job
@@ -66,8 +66,8 @@ pub struct QueryStageExec {
shuffle_output_partitioning: Option<Partitioning>,
}
-impl QueryStageExec {
- /// Create a new query stage
+impl ShuffleWriterExec {
+ /// Create a new shuffle writer
pub fn try_new(
job_id: String,
stage_id: usize,
@@ -96,7 +96,7 @@ impl QueryStageExec {
}
#[async_trait]
-impl ExecutionPlan for QueryStageExec {
+impl ExecutionPlan for ShuffleWriterExec {
fn as_any(&self) -> &dyn Any {
self
}
@@ -118,7 +118,7 @@ impl ExecutionPlan for QueryStageExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
assert!(children.len() == 1);
- Ok(Arc::new(QueryStageExec::try_new(
+ Ok(Arc::new(ShuffleWriterExec::try_new(
self.job_id.clone(),
self.stage_id,
children[0].clone(),
@@ -379,7 +379,7 @@ mod tests {
async fn test() -> Result<()> {
let input_plan = create_input_plan()?;
let work_dir = TempDir::new()?;
- let query_stage = QueryStageExec::try_new(
+ let query_stage = ShuffleWriterExec::try_new(
"jobOne".to_owned(),
1,
input_plan,
@@ -418,7 +418,7 @@ mod tests {
async fn test_partitioned() -> Result<()> {
let input_plan = create_input_plan()?;
let work_dir = TempDir::new()?;
- let query_stage = QueryStageExec::try_new(
+ let query_stage = ShuffleWriterExec::try_new(
"jobOne".to_owned(),
1,
input_plan,
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 5c1b417..9c53bc7 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -30,10 +30,10 @@ use datafusion::{
};
use log::info;
-/// UnresolvedShuffleExec represents a dependency on the results of several QueryStageExec nodes which haven't been computed yet.
+/// UnresolvedShuffleExec represents a dependency on the results of several ShuffleWriterExec nodes which haven't been computed yet.
///
/// An ExecutionPlan that contains an UnresolvedShuffleExec isn't ready for execution. The presence of this ExecutionPlan
-/// is used as a signal so the scheduler knows it can't start computation on a specific QueryStageExec.
+/// is used as a signal so the scheduler knows it can't start computation on a specific ShuffleWriterExec.
#[derive(Debug, Clone)]
pub struct UnresolvedShuffleExec {
// The query stage ids which needs to be computed
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 26bdb00..d043763 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
use std::{fs::File, pin::Pin};
use crate::error::{BallistaError, Result};
-use crate::execution_plans::{QueryStageExec, UnresolvedShuffleExec};
+use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};
use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionStats;
@@ -106,7 +106,7 @@ pub async fn collect_stream(
Ok(batches)
}
-pub fn produce_diagram(filename: &str, stages: &[Arc<QueryStageExec>]) -> Result<()> {
+pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) -> Result<()> {
let write_file = File::create(filename)?;
let mut w = BufWriter::new(&write_file);
writeln!(w, "digraph G {{")?;
@@ -163,8 +163,8 @@ fn build_exec_plan_diagram(
"CsvExec"
} else if plan.as_any().downcast_ref::<FilterExec>().is_some() {
"FilterExec"
- } else if plan.as_any().downcast_ref::<QueryStageExec>().is_some() {
- "QueryStageExec"
+ } else if plan.as_any().downcast_ref::<ShuffleWriterExec>().is_some() {
+ "ShuffleWriterExec"
} else if plan
.as_any()
.downcast_ref::<UnresolvedShuffleExec>()
diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs
index 90c3927..86aaa7e 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -20,7 +20,7 @@
use std::sync::Arc;
use ballista_core::error::BallistaError;
-use ballista_core::execution_plans::QueryStageExec;
+use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::utils;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::ExecutionPlan;
@@ -51,8 +51,13 @@ impl Executor {
part: usize,
plan: Arc<dyn ExecutionPlan>,
) -> Result<RecordBatch, BallistaError> {
- let exec =
- QueryStageExec::try_new(job_id, stage_id, plan, self.work_dir.clone(), None)?;
+ let exec = ShuffleWriterExec::try_new(
+ job_id,
+ stage_id,
+ plan,
+ self.work_dir.clone(),
+ None,
+ )?;
let mut stream = exec.execute(part).await?;
let batches = utils::collect_stream(&mut stream).await?;
// the output should be a single batch containing metadata (path and statistics)
diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs
index 32fc9a9..70d90a4 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -25,7 +25,7 @@ use std::sync::Arc;
use ballista_core::datasource::DfTableAdapter;
use ballista_core::error::{BallistaError, Result};
use ballista_core::{
- execution_plans::{QueryStageExec, ShuffleReaderExec, UnresolvedShuffleExec},
+ execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
serde::scheduler::PartitionLocation,
};
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
@@ -39,7 +39,7 @@ use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::ExecutionPlan;
use log::info;
-type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<QueryStageExec>>);
+type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
pub struct DistributedPlanner {
next_stage_id: usize,
@@ -58,16 +58,16 @@ impl Default for DistributedPlanner {
}
impl DistributedPlanner {
- /// Returns a vector of ExecutionPlans, where the root node is a [QueryStageExec].
+ /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
/// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
- /// A [QueryStageExec] is created whenever the partitioning changes.
+ /// A [ShuffleWriterExec] is created whenever the partitioning changes.
///
/// Returns an empty vector if the execution_plan doesn't need to be sliced into several stages.
pub fn plan_query_stages(
&mut self,
job_id: &str,
execution_plan: Arc<dyn ExecutionPlan>,
- ) -> Result<Vec<Arc<QueryStageExec>>> {
+ ) -> Result<Vec<Arc<ShuffleWriterExec>>> {
info!("planning query stages");
let (new_plan, mut stages) =
self.plan_query_stages_internal(job_id, execution_plan)?;
@@ -228,8 +228,8 @@ fn create_query_stage(
job_id: &str,
stage_id: usize,
plan: Arc<dyn ExecutionPlan>,
-) -> Result<Arc<QueryStageExec>> {
- Ok(Arc::new(QueryStageExec::try_new(
+) -> Result<Arc<ShuffleWriterExec>> {
+ Ok(Arc::new(ShuffleWriterExec::try_new(
job_id.to_owned(),
stage_id,
plan,
@@ -285,13 +285,13 @@ mod test {
}
/* Expected result:
- QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1
+ ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1
HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
CsvExec: testdata/lineitem; partitions=2
- QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
+ ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
CoalescePartitionsExec
UnresolvedShuffleExec: stages=[1]
- QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
+ ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip
HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]