You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/05/31 10:18:09 UTC
[arrow-datafusion] branch master updated: Refactor Ballista
executor so that FlightService delegates to an Executor struct (#450)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new c8ab5a4 Refactor Ballista executor so that FlightService delegates to an Executor struct (#450)
c8ab5a4 is described below
commit c8ab5a4f00fc8b362eed72d5feb43b03b8ad1fdd
Author: Andy Grove <an...@gmail.com>
AuthorDate: Mon May 31 04:18:00 2021 -0600
Refactor Ballista executor so that FlightService delegates to an Executor struct (#450)
* Refactor Ballista executor so that FlightService delegates to an Executor struct
* Simplify code
* Pass executor to execution_loop
* clippy
* use arrow via datafusion
---
ballista/rust/executor/src/execution_loop.rs | 22 +++---
ballista/rust/executor/src/executor.rs | 100 +++++++++++++++++++++++++++
ballista/rust/executor/src/flight_service.rs | 75 +++++---------------
ballista/rust/executor/src/lib.rs | 1 +
ballista/rust/executor/src/main.rs | 27 +++-----
ballista/rust/scheduler/Cargo.toml | 1 -
6 files changed, 135 insertions(+), 91 deletions(-)
diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs
index 5574a14..afc6f00 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -25,18 +25,16 @@ use log::{debug, error, info, warn};
use tonic::transport::Channel;
use ballista_core::serde::protobuf::ExecutorRegistration;
-use ballista_core::{
- client::BallistaClient,
- serde::protobuf::{
- self, scheduler_grpc_client::SchedulerGrpcClient, task_status, FailedTask,
- PartitionId, PollWorkParams, PollWorkResult, TaskDefinition, TaskStatus,
- },
+use ballista_core::serde::protobuf::{
+ self, scheduler_grpc_client::SchedulerGrpcClient, task_status, FailedTask,
+ PartitionId, PollWorkParams, PollWorkResult, TaskDefinition, TaskStatus,
};
+use ballista_executor::executor::Executor;
use protobuf::CompletedTask;
pub async fn poll_loop(
mut scheduler: SchedulerGrpcClient<Channel>,
- executor_client: BallistaClient,
+ executor: Arc<Executor>,
executor_meta: ExecutorRegistration,
concurrent_tasks: usize,
) {
@@ -67,7 +65,7 @@ pub async fn poll_loop(
Ok(result) => {
if let Some(task) = result.into_inner().task {
run_received_tasks(
- executor_client.clone(),
+ executor.clone(),
executor_meta.id.clone(),
available_tasks_slots.clone(),
task_status_sender,
@@ -86,7 +84,7 @@ pub async fn poll_loop(
}
async fn run_received_tasks(
- mut executor_client: BallistaClient,
+ executor: Arc<Executor>,
executor_id: String,
available_tasks_slots: Arc<AtomicUsize>,
task_status_sender: Sender<TaskStatus>,
@@ -96,15 +94,13 @@ async fn run_received_tasks(
available_tasks_slots.fetch_sub(1, Ordering::SeqCst);
let plan: Arc<dyn ExecutionPlan> = (&task.plan.unwrap()).try_into().unwrap();
let task_id = task.task_id.unwrap();
- // TODO: This is a convoluted way of executing the task. We should move the task
- // execution code outside of the FlightService (data plane) into the control plane.
tokio::spawn(async move {
- let execution_result = executor_client
+ let execution_result = executor
.execute_partition(
task_id.job_id.clone(),
task_id.stage_id as usize,
- vec![task_id.partition_id as usize],
+ task_id.partition_id as usize,
plan,
)
.await;
diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs
new file mode 100644
index 0000000..e2945bf
--- /dev/null
+++ b/ballista/rust/executor/src/executor.rs
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Ballista executor logic
+
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Instant;
+
+use ballista_core::error::BallistaError;
+use ballista_core::utils;
+use datafusion::arrow::array::{ArrayRef, StringBuilder};
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::physical_plan::ExecutionPlan;
+use log::info;
+
+/// Ballista executor
+pub struct Executor {
+ /// Directory for storing partial results
+ work_dir: String,
+}
+
+impl Executor {
+ /// Create a new executor instance
+ pub fn new(work_dir: &str) -> Self {
+ Self {
+ work_dir: work_dir.to_owned(),
+ }
+ }
+}
+
+impl Executor {
+ /// Execute one partition of a query stage and persist the result to disk in IPC format. On
+ /// success, return a RecordBatch containing metadata about the results, including path
+ /// and statistics.
+ pub async fn execute_partition(
+ &self,
+ job_id: String,
+ stage_id: usize,
+ part: usize,
+ plan: Arc<dyn ExecutionPlan>,
+ ) -> Result<RecordBatch, BallistaError> {
+ let mut path = PathBuf::from(&self.work_dir);
+ path.push(&job_id);
+ path.push(&format!("{}", stage_id));
+ path.push(&format!("{}", part));
+ std::fs::create_dir_all(&path)?;
+
+ path.push("data.arrow");
+ let path = path.to_str().unwrap();
+ info!("Writing results to {}", path);
+
+ let now = Instant::now();
+
+ // execute the query partition
+ let mut stream = plan.execute(part).await?;
+
+ // stream results to disk
+ let stats = utils::write_stream_to_disk(&mut stream, &path).await?;
+
+ info!(
+ "Executed partition {} in {} seconds. Statistics: {:?}",
+ part,
+ now.elapsed().as_secs(),
+ stats
+ );
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("path", DataType::Utf8, false),
+ stats.arrow_struct_repr(),
+ ]));
+
+ // build result set with summary of the partition execution status
+ let mut c0 = StringBuilder::new(1);
+ c0.append_value(&path).unwrap();
+ let path: ArrayRef = Arc::new(c0.finish());
+
+ let stats: ArrayRef = stats.to_arrow_arrayref()?;
+ RecordBatch::try_new(schema, vec![path, stats]).map_err(BallistaError::ArrowError)
+ }
+
+ pub fn work_dir(&self) -> &str {
+ &self.work_dir
+ }
+}
diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs
index b35ac15..d4eb122 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -21,12 +21,11 @@ use std::fs::File;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
-use std::time::Instant;
+use crate::executor::Executor;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
use ballista_core::serde::scheduler::{Action as BallistaAction, PartitionStats};
-use ballista_core::utils;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
@@ -34,14 +33,12 @@ use arrow_flight::{
PutResult, SchemaResult, Ticket,
};
use datafusion::arrow::{
- array::{ArrayRef, StringBuilder},
datatypes::{DataType, Field, Schema},
error::ArrowError,
ipc::reader::FileReader,
ipc::writer::IpcWriteOptions,
record_batch::RecordBatch,
};
-use datafusion::error::DataFusionError;
use datafusion::physical_plan::displayable;
use futures::{Stream, StreamExt};
use log::{info, warn};
@@ -61,12 +58,13 @@ type FlightDataReceiver = Receiver<Result<FlightData, Status>>;
/// Service implementing the Apache Arrow Flight Protocol
#[derive(Clone)]
pub struct BallistaFlightService {
- work_dir: String,
+ /// Executor
+ executor: Arc<Executor>,
}
impl BallistaFlightService {
- pub fn new(work_dir: String) -> Self {
- Self { work_dir }
+ pub fn new(executor: Arc<Executor>) -> Self {
+ Self { executor }
}
}
@@ -105,59 +103,22 @@ impl FlightService for BallistaFlightService {
let mut tasks: Vec<JoinHandle<Result<_, BallistaError>>> = vec![];
for &part in &partition.partition_id {
- let mut path = PathBuf::from(&self.work_dir);
let partition = partition.clone();
+ let executor = self.executor.clone();
tasks.push(tokio::spawn(async move {
- path.push(partition.job_id);
- path.push(&format!("{}", partition.stage_id));
- path.push(&format!("{}", part));
- std::fs::create_dir_all(&path)?;
-
- path.push("data.arrow");
- let path = path.to_str().unwrap();
- info!("Writing results to {}", path);
-
- let now = Instant::now();
-
- // execute the query partition
- let mut stream = partition
- .plan
- .execute(part)
- .await
- .map_err(|e| from_datafusion_err(&e))?;
-
- // stream results to disk
- let stats = utils::write_stream_to_disk(&mut stream, &path)
- .await
- .map_err(|e| from_ballista_err(&e))?;
-
- info!(
- "Executed partition {} in {} seconds. Statistics: {:?}",
- part,
- now.elapsed().as_secs(),
- stats
- );
+ let results = executor
+ .execute_partition(
+ partition.job_id.clone(),
+ partition.stage_id,
+ part,
+ partition.plan.clone(),
+ )
+ .await?;
+ let results = vec![results];
let mut flights: Vec<Result<FlightData, Status>> = vec![];
let options = arrow::ipc::writer::IpcWriteOptions::default();
- let schema = Arc::new(Schema::new(vec![
- Field::new("path", DataType::Utf8, false),
- stats.arrow_struct_repr(),
- ]));
-
- // build result set with summary of the partition execution status
- let mut c0 = StringBuilder::new(1);
- c0.append_value(&path).unwrap();
- let path: ArrayRef = Arc::new(c0.finish());
-
- let stats: ArrayRef = stats.to_arrow_arrayref()?;
- let results = vec![RecordBatch::try_new(
- schema,
- vec![path, stats],
- )
- .unwrap()];
-
let mut batches: Vec<Result<FlightData, Status>> = results
.iter()
.flat_map(|batch| create_flight_iter(batch, &options))
@@ -208,7 +169,7 @@ impl FlightService for BallistaFlightService {
// fetch a partition that was previously executed by this executor
info!("FetchPartition {:?}", partition_id);
- let mut path = PathBuf::from(&self.work_dir);
+ let mut path = PathBuf::from(self.executor.work_dir());
path.push(&partition_id.job_id);
path.push(&format!("{}", partition_id.stage_id));
path.push(&format!("{}", partition_id.partition_id));
@@ -368,7 +329,3 @@ fn from_arrow_err(e: &ArrowError) -> Status {
fn from_ballista_err(e: &ballista_core::error::BallistaError) -> Status {
Status::internal(format!("Ballista Error: {:?}", e))
}
-
-fn from_datafusion_err(e: &DataFusionError) -> Status {
- Status::internal(format!("DataFusion Error: {:?}", e))
-}
diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs
index 08646eb..188b944 100644
--- a/ballista/rust/executor/src/lib.rs
+++ b/ballista/rust/executor/src/lib.rs
@@ -18,4 +18,5 @@
//! Core executor logic for executing queries and storing results in memory.
pub mod collect;
+pub mod executor;
pub mod flight_service;
diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs
index ad7c001..aad53d7 100644
--- a/ballista/rust/executor/src/main.rs
+++ b/ballista/rust/executor/src/main.rs
@@ -30,17 +30,15 @@ use tempfile::TempDir;
use tonic::transport::Server;
use uuid::Uuid;
-use ballista_core::{
- client::BallistaClient,
- serde::protobuf::{
- executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
- ExecutorRegistration,
- },
+use ballista_core::serde::protobuf::{
+ executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
+ ExecutorRegistration,
};
use ballista_core::{
print_version, serde::protobuf::scheduler_grpc_server::SchedulerGrpcServer,
BALLISTA_VERSION,
};
+use ballista_executor::executor::Executor;
use ballista_executor::flight_service::BallistaFlightService;
use ballista_scheduler::{state::StandaloneClient, SchedulerServer};
use config::prelude::*;
@@ -166,7 +164,10 @@ async fn main() -> Result<()> {
let scheduler = SchedulerGrpcClient::connect(scheduler_url)
.await
.context("Could not connect to scheduler")?;
- let service = BallistaFlightService::new(work_dir);
+
+ let executor = Arc::new(Executor::new(&work_dir));
+
+ let service = BallistaFlightService::new(executor.clone());
let server = FlightServiceServer::new(service);
info!(
@@ -174,19 +175,9 @@ async fn main() -> Result<()> {
BALLISTA_VERSION, addr
);
let server_future = tokio::spawn(Server::builder().add_service(server).serve(addr));
- let client_host = external_host.as_deref().unwrap_or_else(|| {
- if bind_host == "0.0.0.0" {
- // If the executor is being bound to "0.0.0.0" (which means use all ips in all eth devices)
- // then use "localhost" to connect to itself through the BallistaClient
- "localhost"
- } else {
- &bind_host
- }
- });
- let client = BallistaClient::try_new(client_host, port).await?;
tokio::spawn(execution_loop::poll_loop(
scheduler,
- client,
+ executor,
executor_meta,
opt.concurrent_tasks,
));
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 19e2574..c009cc6 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -52,7 +52,6 @@ tonic = "0.4"
tower = { version = "0.4" }
warp = "0.3"
-arrow = { version = "4.0" }
datafusion = { path = "../../../datafusion" }
[dev-dependencies]