You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/05/31 10:18:09 UTC

[arrow-datafusion] branch master updated: Refactor Ballista executor so that FlightService delegates to an Executor struct (#450)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new c8ab5a4  Refactor Ballista executor so that FlightService delegates to an Executor struct (#450)
c8ab5a4 is described below

commit c8ab5a4f00fc8b362eed72d5feb43b03b8ad1fdd
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon May 31 04:18:00 2021 -0600

    Refactor Ballista executor so that FlightService delegates to an Executor struct (#450)
    
    * Refactor Ballista executor so that FlightService delegates to an Executor struct
    
    * Simplify code
    
    * Pass executor to execution_loop
    
    * clippy
    
    * use arrow via datafusion
---
 ballista/rust/executor/src/execution_loop.rs |  22 +++---
 ballista/rust/executor/src/executor.rs       | 100 +++++++++++++++++++++++++++
 ballista/rust/executor/src/flight_service.rs |  75 +++++---------------
 ballista/rust/executor/src/lib.rs            |   1 +
 ballista/rust/executor/src/main.rs           |  27 +++-----
 ballista/rust/scheduler/Cargo.toml           |   1 -
 6 files changed, 135 insertions(+), 91 deletions(-)

diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs
index 5574a14..afc6f00 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -25,18 +25,16 @@ use log::{debug, error, info, warn};
 use tonic::transport::Channel;
 
 use ballista_core::serde::protobuf::ExecutorRegistration;
-use ballista_core::{
-    client::BallistaClient,
-    serde::protobuf::{
-        self, scheduler_grpc_client::SchedulerGrpcClient, task_status, FailedTask,
-        PartitionId, PollWorkParams, PollWorkResult, TaskDefinition, TaskStatus,
-    },
+use ballista_core::serde::protobuf::{
+    self, scheduler_grpc_client::SchedulerGrpcClient, task_status, FailedTask,
+    PartitionId, PollWorkParams, PollWorkResult, TaskDefinition, TaskStatus,
 };
+use ballista_executor::executor::Executor;
 use protobuf::CompletedTask;
 
 pub async fn poll_loop(
     mut scheduler: SchedulerGrpcClient<Channel>,
-    executor_client: BallistaClient,
+    executor: Arc<Executor>,
     executor_meta: ExecutorRegistration,
     concurrent_tasks: usize,
 ) {
@@ -67,7 +65,7 @@ pub async fn poll_loop(
             Ok(result) => {
                 if let Some(task) = result.into_inner().task {
                     run_received_tasks(
-                        executor_client.clone(),
+                        executor.clone(),
                         executor_meta.id.clone(),
                         available_tasks_slots.clone(),
                         task_status_sender,
@@ -86,7 +84,7 @@ pub async fn poll_loop(
 }
 
 async fn run_received_tasks(
-    mut executor_client: BallistaClient,
+    executor: Arc<Executor>,
     executor_id: String,
     available_tasks_slots: Arc<AtomicUsize>,
     task_status_sender: Sender<TaskStatus>,
@@ -96,15 +94,13 @@ async fn run_received_tasks(
     available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
     let plan: Arc<dyn ExecutionPlan> = (&task.plan.unwrap()).try_into().unwrap();
     let task_id = task.task_id.unwrap();
-    // TODO: This is a convoluted way of executing the task. We should move the task
-    // execution code outside of the FlightService (data plane) into the control plane.
 
     tokio::spawn(async move {
-        let execution_result = executor_client
+        let execution_result = executor
             .execute_partition(
                 task_id.job_id.clone(),
                 task_id.stage_id as usize,
-                vec![task_id.partition_id as usize],
+                task_id.partition_id as usize,
                 plan,
             )
             .await;
diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs
new file mode 100644
index 0000000..e2945bf
--- /dev/null
+++ b/ballista/rust/executor/src/executor.rs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Ballista executor logic
+
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Instant;
+
+use ballista_core::error::BallistaError;
+use ballista_core::utils;
+use datafusion::arrow::array::{ArrayRef, StringBuilder};
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::physical_plan::ExecutionPlan;
+use log::info;
+
+/// Ballista executor
+pub struct Executor {
+    /// Directory for storing partial results
+    work_dir: String,
+}
+
+impl Executor {
+    /// Create a new executor instance
+    pub fn new(work_dir: &str) -> Self {
+        Self {
+            work_dir: work_dir.to_owned(),
+        }
+    }
+}
+
+impl Executor {
+    /// Execute one partition of a query stage and persist the result to disk in IPC format. On
+    /// success, return a RecordBatch containing metadata about the results, including path
+    /// and statistics.
+    pub async fn execute_partition(
+        &self,
+        job_id: String,
+        stage_id: usize,
+        part: usize,
+        plan: Arc<dyn ExecutionPlan>,
+    ) -> Result<RecordBatch, BallistaError> {
+        let mut path = PathBuf::from(&self.work_dir);
+        path.push(&job_id);
+        path.push(&format!("{}", stage_id));
+        path.push(&format!("{}", part));
+        std::fs::create_dir_all(&path)?;
+
+        path.push("data.arrow");
+        let path = path.to_str().unwrap();
+        info!("Writing results to {}", path);
+
+        let now = Instant::now();
+
+        // execute the query partition
+        let mut stream = plan.execute(part).await?;
+
+        // stream results to disk
+        let stats = utils::write_stream_to_disk(&mut stream, &path).await?;
+
+        info!(
+            "Executed partition {} in {} seconds. Statistics: {:?}",
+            part,
+            now.elapsed().as_secs(),
+            stats
+        );
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("path", DataType::Utf8, false),
+            stats.arrow_struct_repr(),
+        ]));
+
+        // build result set with summary of the partition execution status
+        let mut c0 = StringBuilder::new(1);
+        c0.append_value(&path).unwrap();
+        let path: ArrayRef = Arc::new(c0.finish());
+
+        let stats: ArrayRef = stats.to_arrow_arrayref()?;
+        RecordBatch::try_new(schema, vec![path, stats]).map_err(BallistaError::ArrowError)
+    }
+
+    pub fn work_dir(&self) -> &str {
+        &self.work_dir
+    }
+}
diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs
index b35ac15..d4eb122 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -21,12 +21,11 @@ use std::fs::File;
 use std::path::PathBuf;
 use std::pin::Pin;
 use std::sync::Arc;
-use std::time::Instant;
 
+use crate::executor::Executor;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
 use ballista_core::serde::scheduler::{Action as BallistaAction, PartitionStats};
-use ballista_core::utils;
 
 use arrow_flight::{
     flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
@@ -34,14 +33,12 @@ use arrow_flight::{
     PutResult, SchemaResult, Ticket,
 };
 use datafusion::arrow::{
-    array::{ArrayRef, StringBuilder},
     datatypes::{DataType, Field, Schema},
     error::ArrowError,
     ipc::reader::FileReader,
     ipc::writer::IpcWriteOptions,
     record_batch::RecordBatch,
 };
-use datafusion::error::DataFusionError;
 use datafusion::physical_plan::displayable;
 use futures::{Stream, StreamExt};
 use log::{info, warn};
@@ -61,12 +58,13 @@ type FlightDataReceiver = Receiver<Result<FlightData, Status>>;
 /// Service implementing the Apache Arrow Flight Protocol
 #[derive(Clone)]
 pub struct BallistaFlightService {
-    work_dir: String,
+    /// Executor
+    executor: Arc<Executor>,
 }
 
 impl BallistaFlightService {
-    pub fn new(work_dir: String) -> Self {
-        Self { work_dir }
+    pub fn new(executor: Arc<Executor>) -> Self {
+        Self { executor }
     }
 }
 
@@ -105,59 +103,22 @@ impl FlightService for BallistaFlightService {
 
                 let mut tasks: Vec<JoinHandle<Result<_, BallistaError>>> = vec![];
                 for &part in &partition.partition_id {
-                    let mut path = PathBuf::from(&self.work_dir);
                     let partition = partition.clone();
+                    let executor = self.executor.clone();
                     tasks.push(tokio::spawn(async move {
-                        path.push(partition.job_id);
-                        path.push(&format!("{}", partition.stage_id));
-                        path.push(&format!("{}", part));
-                        std::fs::create_dir_all(&path)?;
-
-                        path.push("data.arrow");
-                        let path = path.to_str().unwrap();
-                        info!("Writing results to {}", path);
-
-                        let now = Instant::now();
-
-                        // execute the query partition
-                        let mut stream = partition
-                            .plan
-                            .execute(part)
-                            .await
-                            .map_err(|e| from_datafusion_err(&e))?;
-
-                        // stream results to disk
-                        let stats = utils::write_stream_to_disk(&mut stream, &path)
-                            .await
-                            .map_err(|e| from_ballista_err(&e))?;
-
-                        info!(
-                            "Executed partition {} in {} seconds. Statistics: {:?}",
-                            part,
-                            now.elapsed().as_secs(),
-                            stats
-                        );
+                        let results = executor
+                            .execute_partition(
+                                partition.job_id.clone(),
+                                partition.stage_id,
+                                part,
+                                partition.plan.clone(),
+                            )
+                            .await?;
+                        let results = vec![results];
 
                         let mut flights: Vec<Result<FlightData, Status>> = vec![];
                         let options = arrow::ipc::writer::IpcWriteOptions::default();
 
-                        let schema = Arc::new(Schema::new(vec![
-                            Field::new("path", DataType::Utf8, false),
-                            stats.arrow_struct_repr(),
-                        ]));
-
-                        // build result set with summary of the partition execution status
-                        let mut c0 = StringBuilder::new(1);
-                        c0.append_value(&path).unwrap();
-                        let path: ArrayRef = Arc::new(c0.finish());
-
-                        let stats: ArrayRef = stats.to_arrow_arrayref()?;
-                        let results = vec![RecordBatch::try_new(
-                            schema,
-                            vec![path, stats],
-                        )
-                        .unwrap()];
-
                         let mut batches: Vec<Result<FlightData, Status>> = results
                             .iter()
                             .flat_map(|batch| create_flight_iter(batch, &options))
@@ -208,7 +169,7 @@ impl FlightService for BallistaFlightService {
                 // fetch a partition that was previously executed by this executor
                 info!("FetchPartition {:?}", partition_id);
 
-                let mut path = PathBuf::from(&self.work_dir);
+                let mut path = PathBuf::from(self.executor.work_dir());
                 path.push(&partition_id.job_id);
                 path.push(&format!("{}", partition_id.stage_id));
                 path.push(&format!("{}", partition_id.partition_id));
@@ -368,7 +329,3 @@ fn from_arrow_err(e: &ArrowError) -> Status {
 fn from_ballista_err(e: &ballista_core::error::BallistaError) -> Status {
     Status::internal(format!("Ballista Error: {:?}", e))
 }
-
-fn from_datafusion_err(e: &DataFusionError) -> Status {
-    Status::internal(format!("DataFusion Error: {:?}", e))
-}
diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs
index 08646eb..188b944 100644
--- a/ballista/rust/executor/src/lib.rs
+++ b/ballista/rust/executor/src/lib.rs
@@ -18,4 +18,5 @@
 //! Core executor logic for executing queries and storing results in memory.
 
 pub mod collect;
+pub mod executor;
 pub mod flight_service;
diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs
index ad7c001..aad53d7 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -30,17 +30,15 @@ use tempfile::TempDir;
 use tonic::transport::Server;
 use uuid::Uuid;
 
-use ballista_core::{
-    client::BallistaClient,
-    serde::protobuf::{
-        executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
-        ExecutorRegistration,
-    },
+use ballista_core::serde::protobuf::{
+    executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
+    ExecutorRegistration,
 };
 use ballista_core::{
     print_version, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
     BALLISTA_VERSION,
 };
+use ballista_executor::executor::Executor;
 use ballista_executor::flight_service::BallistaFlightService;
 use ballista_scheduler::{state::StandaloneClient, SchedulerServer};
 use config::prelude::*;
@@ -166,7 +164,10 @@ async fn main() -> Result<()> {
     let scheduler = SchedulerGrpcClient::connect(scheduler_url)
         .await
         .context("Could not connect to scheduler")?;
-    let service = BallistaFlightService::new(work_dir);
+
+    let executor = Arc::new(Executor::new(&work_dir));
+
+    let service = BallistaFlightService::new(executor.clone());
 
     let server = FlightServiceServer::new(service);
     info!(
@@ -174,19 +175,9 @@ async fn main() -> Result<()> {
         BALLISTA_VERSION, addr
     );
     let server_future = tokio::spawn(Server::builder().add_service(server).serve(addr));
-    let client_host = external_host.as_deref().unwrap_or_else(|| {
-        if bind_host == "0.0.0.0" {
-            // If the executor is being bound to "0.0.0.0" (which means use all ips in all eth devices)
-            // then use "localhost" to connect to itself through the BallistaClient
-            "localhost"
-        } else {
-            &bind_host
-        }
-    });
-    let client = BallistaClient::try_new(client_host, port).await?;
     tokio::spawn(execution_loop::poll_loop(
         scheduler,
-        client,
+        executor,
         executor_meta,
         opt.concurrent_tasks,
     ));
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 19e2574..c009cc6 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -52,7 +52,6 @@ tonic = "0.4"
 tower = { version = "0.4" }
 warp = "0.3"
 
-arrow = { version = "4.0"  }
 datafusion = { path = "../../../datafusion" }
 
 [dev-dependencies]