You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2021/04/17 22:28:51 UTC

[arrow] branch master updated: ARROW-12437: [Rust] [Ballista] Create DataFusion context without repartition

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ad51be  ARROW-12437: [Rust] [Ballista] Create DataFusion context without repartition
7ad51be is described below

commit 7ad51bef8841f5c316fc304fc756c70d89cbf9d9
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sat Apr 17 16:27:37 2021 -0600

    ARROW-12437: [Rust] [Ballista] Create DataFusion context without repartition
    
    Ballista plans must not include `RepartitionExec` because it results in incorrect results. Ballista needs to manage it's own repartitioning in a distributed-aware way later on. For now, we just need to configure the DataFusion context to disable repartition.
    
    Closes #10086 from andygrove/ballista-no-repartition
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Andy Grove <an...@gmail.com>
---
 rust/ballista/rust/client/src/context.rs |  8 ++++----
 rust/ballista/rust/core/src/utils.rs     | 18 ++++++++++++++++++
 rust/ballista/rust/scheduler/src/lib.rs  |  6 +++---
 3 files changed, 25 insertions(+), 7 deletions(-)

diff --git a/rust/ballista/rust/client/src/context.rs b/rust/ballista/rust/client/src/context.rs
index 0556c29..400f6b6 100644
--- a/rust/ballista/rust/client/src/context.rs
+++ b/rust/ballista/rust/client/src/context.rs
@@ -33,11 +33,11 @@ use ballista_core::{
     datasource::DFTableAdapter,
     error::{BallistaError, Result},
     memory_stream::MemoryStream,
+    utils::create_datafusion_context,
 };
 
 use arrow::datatypes::Schema;
 use datafusion::catalog::TableReference;
-use datafusion::execution::context::ExecutionContext;
 use datafusion::logical_plan::{DFSchema, Expr, LogicalPlan, Partitioning};
 use datafusion::physical_plan::csv::CsvReadOptions;
 use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream};
@@ -94,7 +94,7 @@ impl BallistaContext {
         let path = fs::canonicalize(&path)?;
 
         // use local DataFusion context for now but later this might call the scheduler
-        let mut ctx = ExecutionContext::new();
+        let mut ctx = create_datafusion_context();
         let df = ctx.read_parquet(path.to_str().unwrap())?;
         Ok(BallistaDataFrame::from(self.state.clone(), df))
     }
@@ -111,7 +111,7 @@ impl BallistaContext {
         let path = fs::canonicalize(&path)?;
 
         // use local DataFusion context for now but later this might call the scheduler
-        let mut ctx = ExecutionContext::new();
+        let mut ctx = create_datafusion_context();
         let df = ctx.read_csv(path.to_str().unwrap(), options)?;
         Ok(BallistaDataFrame::from(self.state.clone(), df))
     }
@@ -143,7 +143,7 @@ impl BallistaContext {
     /// Create a DataFrame from a SQL statement
     pub fn sql(&self, sql: &str) -> Result<BallistaDataFrame> {
         // use local DataFusion context for now but later this might call the scheduler
-        let mut ctx = ExecutionContext::new();
+        let mut ctx = create_datafusion_context();
         // register tables
         let state = self.state.lock().unwrap();
         for (name, plan) in &state.tables {
diff --git a/rust/ballista/rust/core/src/utils.rs b/rust/ballista/rust/core/src/utils.rs
index d1c239a..ee9c955 100644
--- a/rust/ballista/rust/core/src/utils.rs
+++ b/rust/ballista/rust/core/src/utils.rs
@@ -33,7 +33,11 @@ use arrow::datatypes::{DataType, Field};
 use arrow::ipc::reader::FileReader;
 use arrow::ipc::writer::FileWriter;
 use arrow::record_batch::RecordBatch;
+use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
 use datafusion::logical_plan::Operator;
+use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
+use datafusion::physical_optimizer::merge_exec::AddMergeExec;
+use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::csv::CsvExec;
 use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
@@ -307,3 +311,17 @@ fn build_exec_plan_diagram(
     }
     Ok(node_id)
 }
+
+/// Create a DataFusion context that is compatible with Ballista
+pub fn create_datafusion_context() -> ExecutionContext {
+    // remove Repartition rule because that isn't supported yet
+    let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+        Arc::new(CoalesceBatches::new()),
+        Arc::new(AddMergeExec::new()),
+    ];
+    let config = ExecutionConfig::new()
+        .with_concurrency(1)
+        .with_repartition_joins(false)
+        .with_physical_optimizer_rules(rules);
+    ExecutionContext::with_config(config)
+}
diff --git a/rust/ballista/rust/scheduler/src/lib.rs b/rust/ballista/rust/scheduler/src/lib.rs
index 54733e3..de49bc0 100644
--- a/rust/ballista/rust/scheduler/src/lib.rs
+++ b/rust/ballista/rust/scheduler/src/lib.rs
@@ -60,12 +60,12 @@ impl parse_arg::ParseArgFromStr for ConfigBackend {
 
 use crate::planner::DistributedPlanner;
 
-use datafusion::execution::context::ExecutionContext;
 use log::{debug, error, info, warn};
 use rand::{distributions::Alphanumeric, thread_rng, Rng};
 use tonic::{Request, Response};
 
 use self::state::{ConfigBackendClient, SchedulerState};
+use ballista_core::utils::create_datafusion_context;
 use datafusion::physical_plan::parquet::ParquetExec;
 use std::time::{Instant, SystemTime, UNIX_EPOCH};
 
@@ -254,7 +254,7 @@ impl SchedulerGrpc for SchedulerServer {
                 Query::Sql(sql) => {
                     //TODO we can't just create a new context because we need a context that has
                     // tables registered from previous SQL statements that have been executed
-                    let mut ctx = ExecutionContext::new();
+                    let mut ctx = create_datafusion_context();
                     let df = ctx.sql(&sql).map_err(|e| {
                         let msg = format!("Error parsing SQL: {}", e);
                         error!("{}", msg);
@@ -303,7 +303,7 @@ impl SchedulerGrpc for SchedulerServer {
             let job_id_spawn = job_id.clone();
             tokio::spawn(async move {
                 // create physical plan using DataFusion
-                let datafusion_ctx = ExecutionContext::new();
+                let datafusion_ctx = create_datafusion_context();
                 macro_rules! fail_job {
                     ($code :expr) => {{
                         match $code {