You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/17 13:15:41 UTC
[arrow-datafusion] branch master updated: Update to use arrow 5.0
(#721)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5c1f8eb Update to use arrow 5.0 (#721)
5c1f8eb is described below
commit 5c1f8eb32c5f423d6c0e1f50350f9fe140135fdb
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 17 09:15:32 2021 -0400
Update to use arrow 5.0 (#721)
* Update to arrow 5.0
* Update to new interfaces
---
ballista/rust/core/Cargo.toml | 2 +-
ballista/rust/executor/Cargo.toml | 4 ++--
ballista/rust/executor/src/flight_service.rs | 6 ++----
datafusion-cli/Cargo.toml | 2 +-
datafusion-examples/Cargo.toml | 2 +-
datafusion-examples/examples/flight_server.rs | 11 +++--------
datafusion/Cargo.toml | 6 +++---
datafusion/src/physical_plan/hash_aggregate.rs | 4 ++--
datafusion/src/physical_plan/mod.rs | 5 +++--
datafusion/src/scalar.rs | 2 +-
10 files changed, 19 insertions(+), 25 deletions(-)
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 3a89c75..ce72d2f 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -42,7 +42,7 @@ tokio = "1.0"
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }
-arrow-flight = { version = "4.0" }
+arrow-flight = { version = "5.0" }
datafusion = { path = "../../../datafusion" }
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index 68e4920..428a5bb 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -43,8 +43,8 @@ tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.4"
uuid = { version = "0.8", features = ["v4"] }
-arrow = { version = "4.0" }
-arrow-flight = { version = "4.0" }
+arrow = { version = "5.0" }
+arrow-flight = { version = "5.0" }
datafusion = { path = "../../../datafusion" }
diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs
index 7325287..9a3f2d8 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -23,6 +23,7 @@ use std::pin::Pin;
use std::sync::Arc;
use crate::executor::Executor;
+use arrow_flight::SchemaAsIpc;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
use ballista_core::serde::scheduler::Action as BallistaAction;
@@ -218,10 +219,7 @@ where
T: Read + Seek,
{
let options = arrow::ipc::writer::IpcWriteOptions::default();
- let schema_flight_data = arrow_flight::utils::flight_data_from_arrow_schema(
- reader.schema().as_ref(),
- &options,
- );
+ let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), &options).into();
send_response(&tx, Ok(schema_flight_data)).await?;
for batch in reader {
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index cd17b61..fda9271 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -31,4 +31,4 @@ clap = "2.33"
rustyline = "8.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
datafusion = { path = "../datafusion" }
-arrow = { version = "4.0" }
+arrow = { version = "5.0" }
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 886f8f5..35aa376 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -29,7 +29,7 @@ publish = false
[dev-dependencies]
-arrow-flight = { version = "4.0" }
+arrow-flight = { version = "5.0" }
datafusion = { path = "../datafusion" }
prost = "0.7"
tonic = "0.4"
diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs
index 8496bcb..138434e 100644
--- a/datafusion-examples/examples/flight_server.rs
+++ b/datafusion-examples/examples/flight_server.rs
@@ -17,6 +17,7 @@
use std::pin::Pin;
+use arrow_flight::SchemaAsIpc;
use futures::Stream;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
@@ -67,10 +68,7 @@ impl FlightService for FlightServiceImpl {
let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap();
let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
- let schema_result = arrow_flight::utils::flight_schema_from_arrow_schema(
- table.schema().as_ref(),
- &options,
- );
+ let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into();
Ok(Response::new(schema_result))
}
@@ -108,10 +106,7 @@ impl FlightService for FlightServiceImpl {
// add an initial FlightData message that sends schema
let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_flight_data =
- arrow_flight::utils::flight_data_from_arrow_schema(
- &df.schema().clone().into(),
- &options,
- );
+ SchemaAsIpc::new(&df.schema().clone().into(), &options).into();
let mut flights: Vec<Result<FlightData, Status>> =
vec![Ok(schema_flight_data)];
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 845de62..2f1e997 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -46,8 +46,8 @@ unicode_expressions = ["unicode-segmentation"]
[dependencies]
ahash = "0.7"
hashbrown = "0.11"
-arrow = { version = "4.4", features = ["prettyprint"] }
-parquet = { version = "4.4", features = ["arrow"] }
+arrow = { version = "5.0", features = ["prettyprint"] }
+parquet = { version = "5.0", features = ["arrow"] }
sqlparser = "0.9.0"
paste = "^1.0"
num_cpus = "1.13.0"
@@ -98,4 +98,4 @@ harness = false
[[bench]]
name = "physical_plan"
-harness = false
\ No newline at end of file
+harness = false
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index b4b7c22..ae51383 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -508,7 +508,7 @@ fn dictionary_create_key_for_col<K: ArrowDictionaryKeyType>(
))
})?;
- create_key_for_col(&dict_col.values(), values_index, vec)
+ create_key_for_col(dict_col.values(), values_index, vec)
}
/// Appends a sequence of [u8] bytes for the value in `col[row]` to
@@ -1104,7 +1104,7 @@ fn dictionary_create_group_by_value<K: ArrowDictionaryKeyType>(
))
})?;
- create_group_by_value(&dict_col.values(), values_index)
+ create_group_by_value(dict_col.values(), values_index)
}
/// Extract the value in `col[row]` as a GroupByScalar
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index d89eb11..b3c0dd6 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -505,8 +505,9 @@ pub trait WindowExpr: Send + Sync + Debug {
end: num_rows,
}])
} else {
- lexicographical_partition_ranges(partition_columns)
- .map_err(DataFusionError::ArrowError)
+ Ok(lexicographical_partition_ranges(partition_columns)
+ .map_err(DataFusionError::ArrowError)?
+ .collect::<Vec<_>>())
}
}
diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs
index f94a090..e7354f8 100644
--- a/datafusion/src/scalar.rs
+++ b/datafusion/src/scalar.rs
@@ -778,7 +778,7 @@ impl ScalarValue {
keys_col.data_type()
))
})?;
- Self::try_from_array(&dict_array.values(), values_index)
+ Self::try_from_array(dict_array.values(), values_index)
}
}