You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/24 18:44:34 UTC
[arrow-ballista] branch master updated: Update with file format breaking change (#40)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 0d3ce5e5 Update with file format breaking change (#40)
0d3ce5e5 is described below
commit 0d3ce5e5254310de801785ca8f075a0cd75267b5
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue May 24 19:44:28 2022 +0100
Update with file format breaking change (#40)
---
ballista-cli/Cargo.lock | 58 +++++++++++++++-------
ballista-cli/Cargo.toml | 4 +-
ballista/rust/client/Cargo.toml | 2 +-
ballista/rust/core/Cargo.toml | 4 +-
ballista/rust/executor/Cargo.toml | 2 +-
ballista/rust/scheduler/Cargo.toml | 2 +-
.../rust/scheduler/src/scheduler_server/grpc.rs | 25 ++++++----
benchmarks/Cargo.toml | 2 +-
examples/Cargo.toml | 2 +-
9 files changed, 64 insertions(+), 37 deletions(-)
diff --git a/ballista-cli/Cargo.lock b/ballista-cli/Cargo.lock
index 528148c9..6e39c2e4 100644
--- a/ballista-cli/Cargo.lock
+++ b/ballista-cli/Cargo.lock
@@ -63,9 +63,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "arrow"
-version = "13.0.0"
+version = "14.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5c6bee230122beb516ead31935a61f683715f987c6f003eff44ad6986624105a"
+checksum = "0612b6a634de6c3f5e63fdaa6932f7bc598f92de0462ac6e69b0aebd77e093aa"
dependencies = [
"bitflags",
"chrono",
@@ -88,9 +88,9 @@ dependencies = [
[[package]]
name = "arrow-flight"
-version = "13.0.0"
+version = "14.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0a3666d2dbc637fa979d1f0bf3031d39a80e709f3b9ec88e3d573c1d666bf553"
+checksum = "ce7b7cfa8eb0dcb0691f18b6a1d9c81cfe3c42726c254be5128d15ebe7580a1d"
dependencies = [
"arrow",
"base64",
@@ -198,7 +198,7 @@ dependencies = [
[[package]]
name = "ballista"
-version = "0.6.0"
+version = "0.7.0"
dependencies = [
"ballista-core",
"datafusion",
@@ -212,7 +212,7 @@ dependencies = [
[[package]]
name = "ballista-cli"
-version = "0.6.0"
+version = "0.7.0"
dependencies = [
"arrow",
"ballista",
@@ -228,7 +228,7 @@ dependencies = [
[[package]]
name = "ballista-core"
-version = "0.6.0"
+version = "0.7.0"
dependencies = [
"ahash",
"arrow-flight",
@@ -500,7 +500,8 @@ dependencies = [
[[package]]
name = "datafusion"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
dependencies = [
"ahash",
"arrow",
@@ -511,6 +512,7 @@ dependencies = [
"datafusion-expr",
"datafusion-physical-expr",
"datafusion-row",
+ "datafusion-sql",
"futures",
"hashbrown 0.12.1",
"lazy_static",
@@ -532,7 +534,8 @@ dependencies = [
[[package]]
name = "datafusion-cli"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
dependencies = [
"arrow",
"clap",
@@ -546,7 +549,8 @@ dependencies = [
[[package]]
name = "datafusion-common"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
dependencies = [
"arrow",
"ordered-float 3.0.0",
@@ -556,7 +560,8 @@ dependencies = [
[[package]]
name = "datafusion-data-access"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
dependencies = [
"async-trait",
"chrono",
@@ -569,7 +574,8 @@ dependencies = [
[[package]]
name = "datafusion-expr"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
dependencies = [
"ahash",
"arrow",
@@ -579,7 +585,8 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
dependencies = [
"ahash",
"arrow",
@@ -602,7 +609,8 @@ dependencies = [
[[package]]
name = "datafusion-proto"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
dependencies = [
"datafusion",
"prost",
@@ -611,7 +619,8 @@ dependencies = [
[[package]]
name = "datafusion-row"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
dependencies = [
"arrow",
"datafusion-common",
@@ -619,6 +628,20 @@ dependencies = [
"rand",
]
+[[package]]
+name = "datafusion-sql"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+dependencies = [
+ "ahash",
+ "arrow",
+ "datafusion-common",
+ "datafusion-expr",
+ "hashbrown 0.12.1",
+ "sqlparser",
+ "tokio",
+]
+
[[package]]
name = "digest"
version = "0.10.3"
@@ -1526,14 +1549,15 @@ dependencies = [
[[package]]
name = "parquet"
-version = "13.0.0"
+version = "14.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c6d737baed48775e87a69aa262f1fa2f1d6bd074dedbe9cac244b9aabf2a0b4"
+checksum = "ba1185ee1da5091e40b86519265a44d2704e3916ff867059c915141cab14d413"
dependencies = [
"arrow",
"base64",
"brotli",
"byteorder",
+ "bytes",
"chrono",
"flate2",
"lz4",
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index b75bf716..d677ab5f 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -32,8 +32,8 @@ readme = "README.md"
arrow = { version = "14.0.0" }
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 7c7b6729..41c6d622 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -32,7 +32,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index ff42abfa..e3ae3e27 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -39,8 +39,8 @@ arrow-flight = { version = "14.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
futures = "0.3"
hashbrown = "0.12"
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index 7a3f91c3..9d182263 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -40,7 +40,7 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
env_logger = "0.9"
futures = "0.3"
hyper = "0.14.4"
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index e4570360..861489b9 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,7 +41,7 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
env_logger = "0.9"
etcd-client = { version = "0.9", optional = true }
futures = "0.3"
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 10be4786..9216155d 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -38,7 +38,7 @@ use datafusion::datafusion_data_access::object_store::{
};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
-use futures::StreamExt;
+use futures::TryStreamExt;
use log::{debug, error, info, trace, warn};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::convert::TryInto;
@@ -281,7 +281,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
request: Request<GetFileMetadataParams>,
) -> std::result::Result<Response<GetFileMetadataResult>, tonic::Status> {
// TODO support multiple object stores
- let obj_store = LocalFileSystem {};
+ let obj_store = Arc::new(LocalFileSystem {}) as Arc<dyn ObjectStore>;
// TODO shouldn't this take a ListingOption object as input?
let GetFileMetadataParams { path, file_type } = request.into_inner();
@@ -300,19 +300,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
)),
}?;
- let file_metas = obj_store.list_file(&path).await.map_err(|e| {
- let msg = format!("Error listing files: {}", e);
- error!("{}", msg);
- tonic::Status::internal(msg)
- })?;
-
- let obj_readers = file_metas.map(move |f| obj_store.file_reader(f?.sized_file));
+ let file_metas: Vec<_> = obj_store
+ .list_file(&path)
+ .await
+ .map_err(|e| {
+ let msg = format!("Error listing files: {}", e);
+ error!("{}", msg);
+ tonic::Status::internal(msg)
+ })?
+ .try_collect()
+ .await?;
let schema = file_format
- .infer_schema(Box::pin(obj_readers))
+ .infer_schema(&obj_store, &file_metas)
.await
.map_err(|e| {
- let msg = format!("Error infering schema: {}", e);
+ let msg = format!("Error inferring schema: {}", e);
error!("{}", msg);
tonic::Status::internal(msg)
})?;
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 287ed0dd..2dd12088 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,7 +33,7 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 6f634aed..bc815020 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.10"