You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/16 09:58:06 UTC

[GitHub] [arrow-datafusion] mingmwang opened a new pull request #1842: [Ballista] Streaming style push-based shuffle and All-at-once stage scheduling in Ballista

mingmwang opened a new pull request #1842:
URL: https://github.com/apache/arrow-datafusion/pull/1842


   # Which issue does this PR close?
   
   Closes [#1805.](https://github.com/apache/arrow-datafusion/issues/1805)
   
    # Rationale for this change
   
   Add a new streaming style push based shuffle implementation.
   
   # What changes are included in this PR?
   1. new stream shuffle reader implementation
   2. PushPartition gRpc call in Arrow-Flight
   3. All-at-once stage scheduler
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] thinkharderdev commented on a change in pull request #1842: [Ballista] Streaming style push-based shuffle and All-at-once stage scheduling in Ballista

Posted by GitBox <gi...@apache.org>.
thinkharderdev commented on a change in pull request #1842:
URL: https://github.com/apache/arrow-datafusion/pull/1842#discussion_r809912822



##########
File path: ballista/rust/scheduler/src/planner.rs
##########
@@ -55,24 +56,41 @@ impl Default for DistributedPlanner {
 
 impl DistributedPlanner {
     /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
-    /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
+    /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec]
+    /// or of type [ShuffleStreamReaderExec] if the created stages are all-at-once stages.
     /// A [ShuffleWriterExec] is created whenever the partitioning changes.
     pub async fn plan_query_stages<'a>(
         &'a mut self,
         job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
     ) -> Result<Vec<Arc<ShuffleWriterExec>>> {
-        info!("planning query stages");
-        let (new_plan, mut stages) = self
-            .plan_query_stages_internal(job_id, execution_plan)
+        info!("planning query stages for job {}", job_id);
+        let (modified_plan, mut stages) = self
+            .plan_query_stages_internal(job_id, execution_plan.clone())
             .await?;
-        stages.push(create_shuffle_writer(
-            job_id,
-            self.next_stage_id(),
-            new_plan,
-            None,
-        )?);
-        Ok(stages)
+        // re-plan the input execution plan and create All-at-once query stages.
+        // Now we just simply depends on the the stage count to decide whether to create All-at-once or normal stages.
+        // In future, we can have more sophisticated way to decide which way to go.
+        if stages.len() > 1 && stages.len() <= 4 {

Review comment:
       If I understand the original design correctly, the "all-at-once" plan will only get scheduled when there are sufficient task slots available to run the entire plan. So should this be a function of the total number of partitions? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #1842: [Ballista] Streaming style push-based shuffle and All-at-once stage scheduling in Ballista

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #1842:
URL: https://github.com/apache/arrow-datafusion/pull/1842#discussion_r810810411



##########
File path: ballista/rust/core/src/execution_plans/shuffle_stream_reader.rs
##########
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::Cell;
+use std::fmt::Formatter;
+use std::sync::{Arc, Mutex};
+use std::{any::Any, pin::Pin};
+
+use async_trait::async_trait;
+use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::physical_plan::metrics::{
+    ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
+};
+use datafusion::physical_plan::stream::RecordBatchReceiverStream;
+use datafusion::physical_plan::{
+    DisplayFormatType, ExecutionPlan, Metric, Partitioning, SendableRecordBatchStream,
+    Statistics,
+};
+use datafusion::{
+    error::{DataFusionError, Result},
+    physical_plan::RecordBatchStream,
+};
+use futures::{future, Stream, StreamExt};
+use log::info;
+use std::time::Instant;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
+use tokio::task;
+use datafusion::execution::runtime_env::RuntimeEnv;
+use datafusion::physical_plan::expressions::PhysicalSortExpr;
+
+/// ShuffleStreamReaderExec reads partitions streams that are pushed by the multiple ShuffleWriterExec
+/// being executed by an executor
+#[derive(Debug, Clone)]
+pub struct ShuffleStreamReaderExec {
+    /// The query stage id which the shuffle reader depends on
+    pub stage_id: usize,
+
+    /// Schema
+    pub(crate) schema: SchemaRef,
+
+    /// Partition count
+    pub partition_count: usize,
+
+    /// Record Batch input receiver
+    batch_input: Arc<Mutex<Vec<Receiver<ArrowResult<RecordBatch>>>>>,
+
+    /// Execution metrics
+    metrics: ExecutionPlanMetricsSet,
+}
+
+impl ShuffleStreamReaderExec {
+    /// Create a new ShuffleStreamReaderExec
+    pub fn new(stage_id: usize, schema: SchemaRef, partition_count: usize) -> Self {
+        Self {
+            stage_id,
+            schema,
+            partition_count,
+            batch_input: Arc::new(Mutex::new(Vec::new())),
+            metrics: ExecutionPlanMetricsSet::new(),
+        }
+    }
+
+    pub fn create_record_batch_channel(&self) -> Sender<ArrowResult<RecordBatch>> {
+        let (response_tx, response_rx): (
+            Sender<ArrowResult<RecordBatch>>,
+            Receiver<ArrowResult<RecordBatch>>,
+        ) = channel(10);
+        self.batch_input.lock().unwrap().push(response_rx);
+        response_tx
+    }
+
+    /// Returns the the streaming shuffle readers in the execution plan
+    pub fn find_stream_shuffle_readers(
+        plan: Arc<dyn ExecutionPlan>,
+    ) -> Vec<ShuffleStreamReaderExec> {
+        if let Some(shuffle_reader) =
+            plan.as_any().downcast_ref::<ShuffleStreamReaderExec>()
+        {
+            vec![shuffle_reader.clone()]
+        } else {
+            let readers = plan
+                .children()
+                .into_iter()
+                .map(|child| ShuffleStreamReaderExec::find_stream_shuffle_readers(child))
+                .collect::<Vec<_>>()
+                .into_iter()
+                .flatten()
+                .collect();
+            readers
+        }
+    }
+}
+
+#[async_trait]
+impl ExecutionPlan for ShuffleStreamReaderExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        // TODO partitioning may be known and could be populated here
+        // see https://github.com/apache/arrow-datafusion/issues/758
+        Partitioning::UnknownPartitioning(self.partition_count)
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn relies_on_input_order(&self) -> bool {
+        false
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        &self,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Err(DataFusionError::Plan(
+            "Ballista ShuffleStreamReaderExec does not support with_new_children()"
+                .to_owned(),
+        ))
+    }
+
+    async fn execute(
+        &self,
+        partition: usize,
+        _runtime: Arc<RuntimeEnv>,
+    ) -> Result<SendableRecordBatchStream> {
+        info!("ShuffleStreamReaderExec::execute({})", partition);
+        let output_rows = MetricBuilder::new(&self.metrics).output_rows(partition);
+        let (sender, receiver): (
+            Sender<ArrowResult<RecordBatch>>,
+            Receiver<ArrowResult<RecordBatch>>,
+        ) = channel(2);
+
+        let schema = &self.schema;
+        let mut rx = self.batch_input.lock().unwrap().pop().unwrap();
+        let join_handle = task::spawn(async move {
+            while let Some(batch) = rx.recv().await {
+                output_rows.add(batch.as_ref().unwrap().num_rows());
+                sender.send(batch).await.ok();
+            }
+        });
+        Ok(RecordBatchReceiverStream::create(
+            schema,
+            receiver,
+            Some(join_handle),
+        ))
+
+        // let schema = &self.schema;
+        // let rx = self.batch_receiver.lock().unwrap().pop().unwrap();
+        // let join_handle = tokio::task::spawn(async move {});
+        // Ok(RecordBatchReceiverStream::create(schema, rx, join_handle))

Review comment:
       ```suggestion
   ```

##########
File path: ballista/rust/scheduler/src/planner.rs
##########
@@ -55,24 +56,41 @@ impl Default for DistributedPlanner {
 
 impl DistributedPlanner {
     /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
-    /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
+    /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec]
+    /// or of type [ShuffleStreamReaderExec] if the created stages are all-at-once stages.
     /// A [ShuffleWriterExec] is created whenever the partitioning changes.
     pub async fn plan_query_stages<'a>(
         &'a mut self,
         job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
     ) -> Result<Vec<Arc<ShuffleWriterExec>>> {
-        info!("planning query stages");
-        let (new_plan, mut stages) = self
-            .plan_query_stages_internal(job_id, execution_plan)
+        info!("planning query stages for job {}", job_id);
+        let (modified_plan, mut stages) = self
+            .plan_query_stages_internal(job_id, execution_plan.clone())

Review comment:
       I think this block is only used in the else branch below when all at once mode is disabled?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] mingmwang commented on a change in pull request #1842: [Ballista] Streaming style push-based shuffle and All-at-once stage scheduling in Ballista

Posted by GitBox <gi...@apache.org>.
mingmwang commented on a change in pull request #1842:
URL: https://github.com/apache/arrow-datafusion/pull/1842#discussion_r810728154



##########
File path: ballista/rust/scheduler/src/planner.rs
##########
@@ -55,24 +56,41 @@ impl Default for DistributedPlanner {
 
 impl DistributedPlanner {
     /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec].
-    /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec].
+    /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec]
+    /// or of type [ShuffleStreamReaderExec] if the created stages are all-at-once stages.
     /// A [ShuffleWriterExec] is created whenever the partitioning changes.
     pub async fn plan_query_stages<'a>(
         &'a mut self,
         job_id: &'a str,
         execution_plan: Arc<dyn ExecutionPlan>,
     ) -> Result<Vec<Arc<ShuffleWriterExec>>> {
-        info!("planning query stages");
-        let (new_plan, mut stages) = self
-            .plan_query_stages_internal(job_id, execution_plan)
+        info!("planning query stages for job {}", job_id);
+        let (modified_plan, mut stages) = self
+            .plan_query_stages_internal(job_id, execution_plan.clone())
             .await?;
-        stages.push(create_shuffle_writer(
-            job_id,
-            self.next_stage_id(),
-            new_plan,
-            None,
-        )?);
-        Ok(stages)
+        // re-plan the input execution plan and create All-at-once query stages.
+        // Now we just simply depends on the the stage count to decide whether to create All-at-once or normal stages.
+        // In future, we can have more sophisticated way to decide which way to go.
+        if stages.len() > 1 && stages.len() <= 4 {

Review comment:
       > If I understand the original design correctly, the "all-at-once" plan will only get scheduled when there are sufficient task slots available to run the entire plan. So should this be a function of the total number of partitions?
   
   Yes, you are right. But currently the scheduler server doesn't have a clear view of how many task slots available. So here I just add simple check on the stage count. After @yahoNanJing refactor the scheduler state and keep more cpu/task info into the memory state, we can add more sophisticated check logic.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org