You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/10/08 12:02:36 UTC
[arrow-ballista] branch master updated: Upgrade to DataFusion 13.0.0-rc1 (#325)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 28b9f41b Upgrade to DataFusion 13.0.0-rc1 (#325)
28b9f41b is described below
commit 28b9f41b600cd3888e2522adb592676b17b15e1a
Author: Andy Grove <an...@gmail.com>
AuthorDate: Sat Oct 8 06:02:32 2022 -0600
Upgrade to DataFusion 13.0.0-rc1 (#325)
---
ballista-cli/Cargo.toml | 4 ++--
ballista/rust/client/Cargo.toml | 6 +++---
ballista/rust/core/Cargo.toml | 8 ++++----
ballista/rust/executor/Cargo.toml | 8 ++++----
ballista/rust/scheduler/Cargo.toml | 6 +++---
benchmarks/Cargo.toml | 4 ++--
examples/Cargo.toml | 2 +-
python/Cargo.toml | 2 +-
python/src/ballista_context.rs | 5 +++--
python/src/context.rs | 15 ++++++++-------
python/src/dataframe.rs | 12 +++++++-----
python/src/dataset.rs | 10 +++++++++-
python/src/dataset_exec.rs | 12 +++++++++---
python/src/expression.rs | 6 +++---
python/src/udaf.rs | 15 ++++++++-------
python/src/udf.rs | 14 +++++++-------
16 files changed, 74 insertions(+), 55 deletions(-)
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 9f28ee9a..ae6dea44 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/rust/client", version = "0.8.0", features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index a7ab2164..f08a4b7d 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -31,12 +31,12 @@ rust-version = "1.59"
ballista-core = { path = "../core", version = "0.8.0" }
ballista-executor = { path = "../executor", version = "0.8.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.8.0", optional = true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
-sqlparser = "0.23"
+sqlparser = "0.25"
tempfile = "3"
tokio = "1.0"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 6699af01..d834ce10 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -41,13 +41,13 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }
-arrow-flight = { version = "23.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
datafusion-objectstore-hdfs = { version = "0.1.0", optional = true }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
futures = "0.3"
hashbrown = "0.12"
@@ -63,7 +63,7 @@ prost = "0.11"
prost-types = "0.11"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
-sqlparser = "0.23"
+sqlparser = "0.25"
tokio = "1.0"
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index ca3532ac..9eda5278 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -34,15 +34,15 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
anyhow = "1"
-arrow = { version = "23.0.0" }
-arrow-flight = { version = "23.0.0" }
+arrow = { version = "24.0.0" }
+arrow-flight = { version = "24.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.8.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
dashmap = "5.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index 90e3f020..ccf5bd6f 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -38,7 +38,7 @@ sled = ["sled_package", "tokio-stream"]
[dependencies]
anyhow = "1"
-arrow-flight = { version = "23.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.8.0" }
@@ -46,8 +46,8 @@ base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
dashmap = "5.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
etcd-client = { version = "0.9", optional = true }
flatbuffers = { version = "2.1.2" }
futures = "0.3"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 167dbee5..553e94ee 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,8 +33,8 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.8.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 7170365e..6a1967a3 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.8.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index c5d2e8e6..829be3be 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
[dependencies]
async-trait = "0.1"
ballista = { path = "../ballista/rust/client", version = "0.8.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "06a4f79f02fcb6ea85303925b7c5a9b0231e3fee", features = ["pyarrow"] }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "13.0.0-rc1", features = ["pyarrow"] }
futures = "0.3"
mimalloc = { version = "*", optional = true, default-features = false }
pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] }
diff --git a/python/src/ballista_context.rs b/python/src/ballista_context.rs
index 2d92e970..d059ba72 100644
--- a/python/src/ballista_context.rs
+++ b/python/src/ballista_context.rs
@@ -25,6 +25,7 @@ use crate::dataframe::PyDataFrame;
use crate::errors::BallistaError;
use ballista::prelude::{BallistaConfig, BallistaContext};
use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::prelude::{AvroReadOptions, CsvReadOptions, ParquetReadOptions};
/// `PyBallistaContext` is able to plan and execute DataFusion plans.
@@ -83,7 +84,7 @@ impl PyBallistaContext {
&mut self,
name: &str,
path: PathBuf,
- schema: Option<Schema>,
+ schema: Option<PyArrowType<Schema>>,
has_header: bool,
delimiter: &str,
schema_infer_max_records: usize,
@@ -108,7 +109,7 @@ impl PyBallistaContext {
.delimiter(delimiter[0])
.schema_infer_max_records(schema_infer_max_records)
.file_extension(file_extension);
- options.schema = schema.as_ref();
+ options.schema = schema.as_ref().map(|x| &x.0);
let result = ctx.register_csv(name, path, options);
wait_for_future(py, result).map_err(BallistaError::from)?;
diff --git a/python/src/context.rs b/python/src/context.rs
index 8dfa9bef..38a3cb9b 100644
--- a/python/src/context.rs
+++ b/python/src/context.rs
@@ -24,6 +24,7 @@ use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::prelude::*;
use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::datasource::TableProvider;
use datafusion::datasource::MemTable;
@@ -100,9 +101,9 @@ impl PySessionContext {
fn create_dataframe(
&mut self,
- partitions: Vec<Vec<RecordBatch>>,
+ partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
) -> PyResult<PyDataFrame> {
- let table = MemTable::try_new(partitions[0][0].schema(), partitions)
+ let table = MemTable::try_new(partitions.0[0][0].schema(), partitions.0)
.map_err(DataFusionError::from)?;
// generate a random (unique) name for this table
@@ -138,10 +139,10 @@ impl PySessionContext {
fn register_record_batches(
&mut self,
name: &str,
- partitions: Vec<Vec<RecordBatch>>,
+ partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
) -> PyResult<()> {
- let schema = partitions[0][0].schema();
- let table = MemTable::try_new(schema, partitions)?;
+ let schema = partitions.0[0][0].schema();
+ let table = MemTable::try_new(schema, partitions.0)?;
self.ctx
.register_table(name, Arc::new(table))
.map_err(DataFusionError::from)?;
@@ -184,7 +185,7 @@ impl PySessionContext {
&mut self,
name: &str,
path: PathBuf,
- schema: Option<Schema>,
+ schema: Option<PyArrowType<Schema>>,
has_header: bool,
delimiter: &str,
schema_infer_max_records: usize,
@@ -206,7 +207,7 @@ impl PySessionContext {
.delimiter(delimiter[0])
.schema_infer_max_records(schema_infer_max_records)
.file_extension(file_extension);
- options.schema = schema.as_ref();
+ options.schema = schema.as_ref().map(|x| &x.0);
let result = self.ctx.register_csv(name, path, options);
wait_for_future(py, result).map_err(DataFusionError::from)?;
diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs
index af1bcf0c..bc1a9a68 100644
--- a/python/src/dataframe.rs
+++ b/python/src/dataframe.rs
@@ -18,7 +18,7 @@
use crate::utils::wait_for_future;
use crate::{errors::DataFusionError, expression::PyExpr};
use datafusion::arrow::datatypes::Schema;
-use datafusion::arrow::pyarrow::PyArrowConvert;
+use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowException, PyArrowType};
use datafusion::arrow::util::pretty;
use datafusion::dataframe::DataFrame;
use datafusion::logical_plan::JoinType;
@@ -65,8 +65,8 @@ impl PyDataFrame {
}
/// Returns the schema from the logical plan
- fn schema(&self) -> Schema {
- self.df.schema().into()
+ fn schema(&self) -> PyArrowType<Schema> {
+ PyArrowType(self.df.schema().into())
}
#[args(args = "*")]
@@ -126,7 +126,8 @@ impl PyDataFrame {
fn show(&self, py: Python, num: usize) -> PyResult<()> {
let df = self.df.limit(0, Some(num))?;
let batches = wait_for_future(py, df.collect())?;
- Ok(pretty::print_batches(&batches)?)
+ pretty::print_batches(&batches)
+ .map_err(|err| PyArrowException::new_err(err.to_string()))
}
fn join(
@@ -162,6 +163,7 @@ impl PyDataFrame {
fn explain(&self, py: Python, verbose: bool, analyze: bool) -> PyResult<()> {
let df = self.df.explain(verbose, analyze)?;
let batches = wait_for_future(py, df.collect())?;
- Ok(pretty::print_batches(&batches)?)
+ pretty::print_batches(&batches)
+ .map_err(|err| PyArrowException::new_err(err.to_string()))
}
}
diff --git a/python/src/dataset.rs b/python/src/dataset.rs
index a678b904..d34d974f 100644
--- a/python/src/dataset.rs
+++ b/python/src/dataset.rs
@@ -27,6 +27,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::datasource::datasource::TableProviderFilterPushDown;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::{DataFusionError, Result as DFResult};
@@ -74,7 +75,14 @@ impl TableProvider for Dataset {
Python::with_gil(|py| {
let dataset = self.dataset.as_ref(py);
// This can panic but since we checked that self.dataset is a pyarrow.dataset.Dataset it should never
- Arc::new(dataset.getattr("schema").unwrap().extract().unwrap())
+ Arc::new(
+ dataset
+ .getattr("schema")
+ .unwrap()
+ .extract::<PyArrowType<_>>()
+ .unwrap()
+ .0,
+ )
})
}
diff --git a/python/src/dataset_exec.rs b/python/src/dataset_exec.rs
index f57a3a76..987b7ad1 100644
--- a/python/src/dataset_exec.rs
+++ b/python/src/dataset_exec.rs
@@ -28,6 +28,7 @@ use futures::stream;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult};
use datafusion::execution::context::TaskContext;
@@ -54,7 +55,7 @@ impl Iterator for PyArrowBatchesAdapter {
Some(
batches
.next()?
- .and_then(|batch| batch.extract())
+ .and_then(|batch| Ok(batch.extract::<PyArrowType<_>>()?.0))
.map_err(|err| ArrowError::ExternalError(Box::new(err))),
)
})
@@ -109,7 +110,12 @@ impl DatasetExec {
let scanner = dataset.call_method("scanner", (), Some(kwargs))?;
- let schema = Arc::new(scanner.getattr("projected_schema")?.extract()?);
+ let schema = Arc::new(
+ scanner
+ .getattr("projected_schema")?
+ .extract::<PyArrowType<_>>()?
+ .0,
+ );
let builtins = Python::import(py, "builtins")?;
let pylist = builtins.getattr("list")?;
@@ -211,7 +217,7 @@ impl ExecutionPlan for DatasetExec {
let schema: SchemaRef = Arc::new(
scanner
.getattr("projected_schema")
- .and_then(|schema| schema.extract())
+ .and_then(|schema| Ok(schema.extract::<PyArrowType<_>>()?.0))
.map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
);
let record_batches: &PyIterator = scanner
diff --git a/python/src/expression.rs b/python/src/expression.rs
index b3275ccf..aa6e540c 100644
--- a/python/src/expression.rs
+++ b/python/src/expression.rs
@@ -19,8 +19,8 @@ use pyo3::{basic::CompareOp, prelude::*};
use std::convert::{From, Into};
use datafusion::arrow::datatypes::DataType;
+use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::logical_plan::{col, lit, Expr};
-
use datafusion::scalar::ScalarValue;
/// An PyExpr that can be used on a DataFrame
@@ -125,12 +125,12 @@ impl PyExpr {
self.expr.clone().is_null().into()
}
- pub fn cast(&self, to: DataType) -> PyExpr {
+ pub fn cast(&self, to: PyArrowType<DataType>) -> PyExpr {
// self.expr.cast_to() requires DFSchema to validate that the cast
// is supported, omit that for now
let expr = Expr::Cast {
expr: Box::new(self.expr.clone()),
- data_type: to,
+ data_type: to.0,
};
expr.into()
}
diff --git a/python/src/udaf.rs b/python/src/udaf.rs
index 3b93048b..f2973476 100644
--- a/python/src/udaf.rs
+++ b/python/src/udaf.rs
@@ -21,7 +21,7 @@ use pyo3::{prelude::*, types::PyTuple};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::datatypes::DataType;
-use datafusion::arrow::pyarrow::PyArrowConvert;
+use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
use datafusion::common::ScalarValue;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{
@@ -82,6 +82,7 @@ impl Accumulator for RustAccumulator {
// 1. cast states to Pyarrow array
let state = state
+ .data()
.to_pyarrow(py)
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
@@ -120,18 +121,18 @@ impl PyAggregateUDF {
fn new(
name: &str,
accumulator: PyObject,
- input_type: DataType,
- return_type: DataType,
- state_type: Vec<DataType>,
+ input_type: PyArrowType<DataType>,
+ return_type: PyArrowType<DataType>,
+ state_type: PyArrowType<Vec<DataType>>,
volatility: &str,
) -> PyResult<Self> {
let function = logical_expr::create_udaf(
name,
- input_type,
- Arc::new(return_type),
+ input_type.0,
+ Arc::new(return_type.0),
parse_volatility(volatility)?,
to_rust_accumulator(accumulator),
- Arc::new(state_type),
+ Arc::new(state_type.0),
);
Ok(Self { function })
}
diff --git a/python/src/udf.rs b/python/src/udf.rs
index 8cf8c9cf..4f188f7e 100644
--- a/python/src/udf.rs
+++ b/python/src/udf.rs
@@ -19,9 +19,9 @@ use std::sync::Arc;
use pyo3::{prelude::*, types::PyTuple};
-use datafusion::arrow::array::ArrayRef;
+use datafusion::arrow::array::{make_array, ArrayData, ArrayRef};
use datafusion::arrow::datatypes::DataType;
-use datafusion::arrow::pyarrow::PyArrowConvert;
+use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowType};
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{self, function::ScalarFunctionImplementation};
use datafusion::physical_plan::functions::make_scalar_function;
@@ -52,7 +52,7 @@ fn to_rust_function(func: PyObject) -> ScalarFunctionImplementation {
}?;
// 3. cast to arrow::array::Array
- let array = ArrayRef::from_pyarrow(value).unwrap();
+ let array = make_array(ArrayData::from_pyarrow(value).unwrap());
Ok(array)
})
},
@@ -72,14 +72,14 @@ impl PyScalarUDF {
fn new(
name: &str,
func: PyObject,
- input_types: Vec<DataType>,
- return_type: DataType,
+ input_types: PyArrowType<Vec<DataType>>,
+ return_type: PyArrowType<DataType>,
volatility: &str,
) -> PyResult<Self> {
let function = logical_expr::create_udf(
name,
- input_types,
- Arc::new(return_type),
+ input_types.0,
+ Arc::new(return_type.0),
parse_volatility(volatility)?,
to_rust_function(func),
);