You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/04/17 22:28:51 UTC
[arrow] branch master updated: ARROW-12437: [Rust] [Ballista]
Create DataFusion context without repartition
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7ad51be ARROW-12437: [Rust] [Ballista] Create DataFusion context without repartition
7ad51be is described below
commit 7ad51bef8841f5c316fc304fc756c70d89cbf9d9
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sat Apr 17 16:27:37 2021 -0600
ARROW-12437: [Rust] [Ballista] Create DataFusion context without repartition
Ballista plans must not include `RepartitionExec` because it results in incorrect results. Ballista needs to manage it's own repartitioning in a distributed-aware way later on. For now, we just need to configure the DataFusion context to disable repartition.
Closes #10086 from andygrove/ballista-no-repartition
Authored-by: Andy Grove <an...@gmail.com>
Signed-off-by: Andy Grove <an...@gmail.com>
---
rust/ballista/rust/client/src/context.rs | 8 ++++----
rust/ballista/rust/core/src/utils.rs | 18 ++++++++++++++++++
rust/ballista/rust/scheduler/src/lib.rs | 6 +++---
3 files changed, 25 insertions(+), 7 deletions(-)
diff --git a/rust/ballista/rust/client/src/context.rs b/rust/ballista/rust/client/src/context.rs
index 0556c29..400f6b6 100644
--- a/rust/ballista/rust/client/src/context.rs
+++ b/rust/ballista/rust/client/src/context.rs
@@ -33,11 +33,11 @@ use ballista_core::{
datasource::DFTableAdapter,
error::{BallistaError, Result},
memory_stream::MemoryStream,
+ utils::create_datafusion_context,
};
use arrow::datatypes::Schema;
use datafusion::catalog::TableReference;
-use datafusion::execution::context::ExecutionContext;
use datafusion::logical_plan::{DFSchema, Expr, LogicalPlan, Partitioning};
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream};
@@ -94,7 +94,7 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;
// use local DataFusion context for now but later this might call the scheduler
- let mut ctx = ExecutionContext::new();
+ let mut ctx = create_datafusion_context();
let df = ctx.read_parquet(path.to_str().unwrap())?;
Ok(BallistaDataFrame::from(self.state.clone(), df))
}
@@ -111,7 +111,7 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;
// use local DataFusion context for now but later this might call the scheduler
- let mut ctx = ExecutionContext::new();
+ let mut ctx = create_datafusion_context();
let df = ctx.read_csv(path.to_str().unwrap(), options)?;
Ok(BallistaDataFrame::from(self.state.clone(), df))
}
@@ -143,7 +143,7 @@ impl BallistaContext {
/// Create a DataFrame from a SQL statement
pub fn sql(&self, sql: &str) -> Result<BallistaDataFrame> {
// use local DataFusion context for now but later this might call the scheduler
- let mut ctx = ExecutionContext::new();
+ let mut ctx = create_datafusion_context();
// register tables
let state = self.state.lock().unwrap();
for (name, plan) in &state.tables {
diff --git a/rust/ballista/rust/core/src/utils.rs b/rust/ballista/rust/core/src/utils.rs
index d1c239a..ee9c955 100644
--- a/rust/ballista/rust/core/src/utils.rs
+++ b/rust/ballista/rust/core/src/utils.rs
@@ -33,7 +33,11 @@ use arrow::datatypes::{DataType, Field};
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::FileWriter;
use arrow::record_batch::RecordBatch;
+use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::logical_plan::Operator;
+use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
+use datafusion::physical_optimizer::merge_exec::AddMergeExec;
+use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::csv::CsvExec;
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
@@ -307,3 +311,17 @@ fn build_exec_plan_diagram(
}
Ok(node_id)
}
+
+/// Create a DataFusion context that is compatible with Ballista
+pub fn create_datafusion_context() -> ExecutionContext {
+ // remove Repartition rule because that isn't supported yet
+ let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+ Arc::new(CoalesceBatches::new()),
+ Arc::new(AddMergeExec::new()),
+ ];
+ let config = ExecutionConfig::new()
+ .with_concurrency(1)
+ .with_repartition_joins(false)
+ .with_physical_optimizer_rules(rules);
+ ExecutionContext::with_config(config)
+}
diff --git a/rust/ballista/rust/scheduler/src/lib.rs b/rust/ballista/rust/scheduler/src/lib.rs
index 54733e3..de49bc0 100644
--- a/rust/ballista/rust/scheduler/src/lib.rs
+++ b/rust/ballista/rust/scheduler/src/lib.rs
@@ -60,12 +60,12 @@ impl parse_arg::ParseArgFromStr for ConfigBackend {
use crate::planner::DistributedPlanner;
-use datafusion::execution::context::ExecutionContext;
use log::{debug, error, info, warn};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use tonic::{Request, Response};
use self::state::{ConfigBackendClient, SchedulerState};
+use ballista_core::utils::create_datafusion_context;
use datafusion::physical_plan::parquet::ParquetExec;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
@@ -254,7 +254,7 @@ impl SchedulerGrpc for SchedulerServer {
Query::Sql(sql) => {
//TODO we can't just create a new context because we need a context that has
// tables registered from previous SQL statements that have been executed
- let mut ctx = ExecutionContext::new();
+ let mut ctx = create_datafusion_context();
let df = ctx.sql(&sql).map_err(|e| {
let msg = format!("Error parsing SQL: {}", e);
error!("{}", msg);
@@ -303,7 +303,7 @@ impl SchedulerGrpc for SchedulerServer {
let job_id_spawn = job_id.clone();
tokio::spawn(async move {
// create physical plan using DataFusion
- let datafusion_ctx = ExecutionContext::new();
+ let datafusion_ctx = create_datafusion_context();
macro_rules! fail_job {
($code :expr) => {{
match $code {