You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/24 18:44:34 UTC

[arrow-ballista] branch master updated: Update with file format breaking change (#40)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d3ce5e5 Update with file format breaking change (#40)
0d3ce5e5 is described below

commit 0d3ce5e5254310de801785ca8f075a0cd75267b5
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue May 24 19:44:28 2022 +0100

    Update with file format breaking change (#40)
---
 ballista-cli/Cargo.lock                            | 58 +++++++++++++++-------
 ballista-cli/Cargo.toml                            |  4 +-
 ballista/rust/client/Cargo.toml                    |  2 +-
 ballista/rust/core/Cargo.toml                      |  4 +-
 ballista/rust/executor/Cargo.toml                  |  2 +-
 ballista/rust/scheduler/Cargo.toml                 |  2 +-
 .../rust/scheduler/src/scheduler_server/grpc.rs    | 25 ++++++----
 benchmarks/Cargo.toml                              |  2 +-
 examples/Cargo.toml                                |  2 +-
 9 files changed, 64 insertions(+), 37 deletions(-)

diff --git a/ballista-cli/Cargo.lock b/ballista-cli/Cargo.lock
index 528148c9..6e39c2e4 100644
--- a/ballista-cli/Cargo.lock
+++ b/ballista-cli/Cargo.lock
@@ -63,9 +63,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
 
 [[package]]
 name = "arrow"
-version = "13.0.0"
+version = "14.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5c6bee230122beb516ead31935a61f683715f987c6f003eff44ad6986624105a"
+checksum = "0612b6a634de6c3f5e63fdaa6932f7bc598f92de0462ac6e69b0aebd77e093aa"
 dependencies = [
  "bitflags",
  "chrono",
@@ -88,9 +88,9 @@ dependencies = [
 
 [[package]]
 name = "arrow-flight"
-version = "13.0.0"
+version = "14.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0a3666d2dbc637fa979d1f0bf3031d39a80e709f3b9ec88e3d573c1d666bf553"
+checksum = "ce7b7cfa8eb0dcb0691f18b6a1d9c81cfe3c42726c254be5128d15ebe7580a1d"
 dependencies = [
  "arrow",
  "base64",
@@ -198,7 +198,7 @@ dependencies = [
 
 [[package]]
 name = "ballista"
-version = "0.6.0"
+version = "0.7.0"
 dependencies = [
  "ballista-core",
  "datafusion",
@@ -212,7 +212,7 @@ dependencies = [
 
 [[package]]
 name = "ballista-cli"
-version = "0.6.0"
+version = "0.7.0"
 dependencies = [
  "arrow",
  "ballista",
@@ -228,7 +228,7 @@ dependencies = [
 
 [[package]]
 name = "ballista-core"
-version = "0.6.0"
+version = "0.7.0"
 dependencies = [
  "ahash",
  "arrow-flight",
@@ -500,7 +500,8 @@ dependencies = [
 
 [[package]]
 name = "datafusion"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
 dependencies = [
  "ahash",
  "arrow",
@@ -511,6 +512,7 @@ dependencies = [
  "datafusion-expr",
  "datafusion-physical-expr",
  "datafusion-row",
+ "datafusion-sql",
  "futures",
  "hashbrown 0.12.1",
  "lazy_static",
@@ -532,7 +534,8 @@ dependencies = [
 
 [[package]]
 name = "datafusion-cli"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
 dependencies = [
  "arrow",
  "clap",
@@ -546,7 +549,8 @@ dependencies = [
 
 [[package]]
 name = "datafusion-common"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
 dependencies = [
  "arrow",
  "ordered-float 3.0.0",
@@ -556,7 +560,8 @@ dependencies = [
 
 [[package]]
 name = "datafusion-data-access"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
 dependencies = [
  "async-trait",
  "chrono",
@@ -569,7 +574,8 @@ dependencies = [
 
 [[package]]
 name = "datafusion-expr"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
 dependencies = [
  "ahash",
  "arrow",
@@ -579,7 +585,8 @@ dependencies = [
 
 [[package]]
 name = "datafusion-physical-expr"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
 dependencies = [
  "ahash",
  "arrow",
@@ -602,7 +609,8 @@ dependencies = [
 
 [[package]]
 name = "datafusion-proto"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
 dependencies = [
  "datafusion",
  "prost",
@@ -611,7 +619,8 @@ dependencies = [
 
 [[package]]
 name = "datafusion-row"
-version = "7.0.0"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
 dependencies = [
  "arrow",
  "datafusion-common",
@@ -619,6 +628,20 @@ dependencies = [
  "rand",
 ]
 
+[[package]]
+name = "datafusion-sql"
+version = "8.0.0"
+source = "git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+dependencies = [
+ "ahash",
+ "arrow",
+ "datafusion-common",
+ "datafusion-expr",
+ "hashbrown 0.12.1",
+ "sqlparser",
+ "tokio",
+]
+
 [[package]]
 name = "digest"
 version = "0.10.3"
@@ -1526,14 +1549,15 @@ dependencies = [
 
 [[package]]
 name = "parquet"
-version = "13.0.0"
+version = "14.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c6d737baed48775e87a69aa262f1fa2f1d6bd074dedbe9cac244b9aabf2a0b4"
+checksum = "ba1185ee1da5091e40b86519265a44d2704e3916ff867059c915141cab14d413"
 dependencies = [
  "arrow",
  "base64",
  "brotli",
  "byteorder",
+ "bytes",
  "chrono",
  "flate2",
  "lz4",
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index b75bf716..d677ab5f 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -32,8 +32,8 @@ readme = "README.md"
 arrow = { version = "14.0.0" }
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 7c7b6729..41c6d622 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -32,7 +32,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
 ballista-executor = { path = "../executor", version = "0.7.0", optional = true }
 ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true }
 
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index ff42abfa..e3ae3e27 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -39,8 +39,8 @@ arrow-flight = { version = "14.0.0" }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
 futures = "0.3"
 hashbrown = "0.12"
 
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index 7a3f91c3..9d182263 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -40,7 +40,7 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
 env_logger = "0.9"
 futures = "0.3"
 hyper = "0.14.4"
diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml
index e4570360..861489b9 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,7 +41,7 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
 env_logger = "0.9"
 etcd-client = { version = "0.9", optional = true }
 futures = "0.3"
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index 10be4786..9216155d 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -38,7 +38,7 @@ use datafusion::datafusion_data_access::object_store::{
 };
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
-use futures::StreamExt;
+use futures::TryStreamExt;
 use log::{debug, error, info, trace, warn};
 use rand::{distributions::Alphanumeric, thread_rng, Rng};
 use std::convert::TryInto;
@@ -281,7 +281,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
         request: Request<GetFileMetadataParams>,
     ) -> std::result::Result<Response<GetFileMetadataResult>, tonic::Status> {
         // TODO support multiple object stores
-        let obj_store = LocalFileSystem {};
+        let obj_store = Arc::new(LocalFileSystem {}) as Arc<dyn ObjectStore>;
         // TODO shouldn't this take a ListingOption object as input?
 
         let GetFileMetadataParams { path, file_type } = request.into_inner();
@@ -300,19 +300,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
             )),
         }?;
 
-        let file_metas = obj_store.list_file(&path).await.map_err(|e| {
-            let msg = format!("Error listing files: {}", e);
-            error!("{}", msg);
-            tonic::Status::internal(msg)
-        })?;
-
-        let obj_readers = file_metas.map(move |f| obj_store.file_reader(f?.sized_file));
+        let file_metas: Vec<_> = obj_store
+            .list_file(&path)
+            .await
+            .map_err(|e| {
+                let msg = format!("Error listing files: {}", e);
+                error!("{}", msg);
+                tonic::Status::internal(msg)
+            })?
+            .try_collect()
+            .await?;
 
         let schema = file_format
-            .infer_schema(Box::pin(obj_readers))
+            .infer_schema(&obj_store, &file_metas)
             .await
             .map_err(|e| {
-                let msg = format!("Error infering schema: {}", e);
+                let msg = format!("Error inferring schema: {}", e);
                 error!("{}", msg);
                 tonic::Status::internal(msg)
             })?;
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 287ed0dd..2dd12088 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,7 +33,7 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 6f634aed..bc815020 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "9ea7dc6036a7b1d28c7450db4f26720b732a50de" }
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.10"