You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/17 13:15:41 UTC

[arrow-datafusion] branch master updated: Update to use arrow 5.0 (#721)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c1f8eb  Update to use arrow 5.0 (#721)
5c1f8eb is described below

commit 5c1f8eb32c5f423d6c0e1f50350f9fe140135fdb
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jul 17 09:15:32 2021 -0400

    Update to use arrow 5.0 (#721)
    
    * Update to arrow 5.0
    
    * Update to new interfaces
---
 ballista/rust/core/Cargo.toml                  |  2 +-
 ballista/rust/executor/Cargo.toml              |  4 ++--
 ballista/rust/executor/src/flight_service.rs   |  6 ++----
 datafusion-cli/Cargo.toml                      |  2 +-
 datafusion-examples/Cargo.toml                 |  2 +-
 datafusion-examples/examples/flight_server.rs  | 11 +++--------
 datafusion/Cargo.toml                          |  6 +++---
 datafusion/src/physical_plan/hash_aggregate.rs |  4 ++--
 datafusion/src/physical_plan/mod.rs            |  5 +++--
 datafusion/src/scalar.rs                       |  2 +-
 10 files changed, 19 insertions(+), 25 deletions(-)

diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 3a89c75..ce72d2f 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -42,7 +42,7 @@ tokio = "1.0"
 tonic = "0.4"
 uuid = { version = "0.8", features = ["v4"] }
 
-arrow-flight = { version = "4.0"  }
+arrow-flight = { version = "5.0"  }
 
 datafusion = { path = "../../../datafusion" }
 
diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml
index 68e4920..428a5bb 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -43,8 +43,8 @@ tokio-stream = { version = "0.1", features = ["net"] }
 tonic = "0.4"
 uuid = { version = "0.8", features = ["v4"] }
 
-arrow = { version = "4.0"  }
-arrow-flight = { version = "4.0"  }
+arrow = { version = "5.0"  }
+arrow-flight = { version = "5.0"  }
 
 datafusion = { path = "../../../datafusion" }
 
diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs
index 7325287..9a3f2d8 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -23,6 +23,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 
 use crate::executor::Executor;
+use arrow_flight::SchemaAsIpc;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
 use ballista_core::serde::scheduler::Action as BallistaAction;
@@ -218,10 +219,7 @@ where
     T: Read + Seek,
 {
     let options = arrow::ipc::writer::IpcWriteOptions::default();
-    let schema_flight_data = arrow_flight::utils::flight_data_from_arrow_schema(
-        reader.schema().as_ref(),
-        &options,
-    );
+    let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), &options).into();
     send_response(&tx, Ok(schema_flight_data)).await?;
 
     for batch in reader {
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index cd17b61..fda9271 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -31,4 +31,4 @@ clap = "2.33"
 rustyline = "8.0"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
 datafusion = { path = "../datafusion" }
-arrow = { version = "4.0"  }
+arrow = { version = "5.0"  }
diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 886f8f5..35aa376 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -29,7 +29,7 @@ publish = false
 
 
 [dev-dependencies]
-arrow-flight = { version = "4.0" }
+arrow-flight = { version = "5.0" }
 datafusion = { path = "../datafusion" }
 prost = "0.7"
 tonic = "0.4"
diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs
index 8496bcb..138434e 100644
--- a/datafusion-examples/examples/flight_server.rs
+++ b/datafusion-examples/examples/flight_server.rs
@@ -17,6 +17,7 @@
 
 use std::pin::Pin;
 
+use arrow_flight::SchemaAsIpc;
 use futures::Stream;
 use tonic::transport::Server;
 use tonic::{Request, Response, Status, Streaming};
@@ -67,10 +68,7 @@ impl FlightService for FlightServiceImpl {
         let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap();
 
         let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
-        let schema_result = arrow_flight::utils::flight_schema_from_arrow_schema(
-            table.schema().as_ref(),
-            &options,
-        );
+        let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into();
 
         Ok(Response::new(schema_result))
     }
@@ -108,10 +106,7 @@ impl FlightService for FlightServiceImpl {
                 // add an initial FlightData message that sends schema
                 let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
                 let schema_flight_data =
-                    arrow_flight::utils::flight_data_from_arrow_schema(
-                        &df.schema().clone().into(),
-                        &options,
-                    );
+                    SchemaAsIpc::new(&df.schema().clone().into(), &options).into();
 
                 let mut flights: Vec<Result<FlightData, Status>> =
                     vec![Ok(schema_flight_data)];
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 845de62..2f1e997 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -46,8 +46,8 @@ unicode_expressions = ["unicode-segmentation"]
 [dependencies]
 ahash = "0.7"
 hashbrown = "0.11"
-arrow = { version = "4.4", features = ["prettyprint"] }
-parquet = { version = "4.4", features = ["arrow"] }
+arrow = { version = "5.0", features = ["prettyprint"] }
+parquet = { version = "5.0", features = ["arrow"] }
 sqlparser = "0.9.0"
 paste = "^1.0"
 num_cpus = "1.13.0"
@@ -98,4 +98,4 @@ harness = false
 
 [[bench]]
 name = "physical_plan"
-harness = false
\ No newline at end of file
+harness = false
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index b4b7c22..ae51383 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -508,7 +508,7 @@ fn dictionary_create_key_for_col<K: ArrowDictionaryKeyType>(
         ))
     })?;
 
-    create_key_for_col(&dict_col.values(), values_index, vec)
+    create_key_for_col(dict_col.values(), values_index, vec)
 }
 
 /// Appends a sequence of [u8] bytes for the value in `col[row]` to
@@ -1104,7 +1104,7 @@ fn dictionary_create_group_by_value<K: ArrowDictionaryKeyType>(
         ))
     })?;
 
-    create_group_by_value(&dict_col.values(), values_index)
+    create_group_by_value(dict_col.values(), values_index)
 }
 
 /// Extract the value in `col[row]` as a GroupByScalar
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index d89eb11..b3c0dd6 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -505,8 +505,9 @@ pub trait WindowExpr: Send + Sync + Debug {
                 end: num_rows,
             }])
         } else {
-            lexicographical_partition_ranges(partition_columns)
-                .map_err(DataFusionError::ArrowError)
+            Ok(lexicographical_partition_ranges(partition_columns)
+                .map_err(DataFusionError::ArrowError)?
+                .collect::<Vec<_>>())
         }
     }
 
diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs
index f94a090..e7354f8 100644
--- a/datafusion/src/scalar.rs
+++ b/datafusion/src/scalar.rs
@@ -778,7 +778,7 @@ impl ScalarValue {
                 keys_col.data_type()
             ))
         })?;
-        Self::try_from_array(&dict_array.values(), values_index)
+        Self::try_from_array(dict_array.values(), values_index)
     }
 }