You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/20 11:26:16 UTC

[arrow-rs] branch active_release updated: Bump prost and tonic (#560) (#565)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch active_release
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/active_release by this push:
     new 2998db4  Bump prost and tonic (#560) (#565)
2998db4 is described below

commit 2998db4a07b5f9cd8da292d7e7e990a19b0b7535
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Jul 20 07:26:09 2021 -0400

    Bump prost and tonic (#560) (#565)
    
    * Update tonic and prost
    
    Signed-off-by: Chojan Shang <ps...@outlook.com>
    
    * Make filter::prep_null_mask_filter public
    
    Signed-off-by: Chojan Shang <ps...@outlook.com>
    
    * Bump prost and tonic in integration-testing
    
    Signed-off-by: Chojan Shang <ps...@outlook.com>
    
    Co-authored-by: Chojan Shang <ps...@ritelabs.net>
---
 arrow-flight/Cargo.toml                   |   8 +-
 arrow-flight/src/arrow.flight.protocol.rs | 210 +++++++++++++++++-------------
 arrow/src/compute/kernels/filter.rs       |   2 +-
 integration-testing/Cargo.toml            |   4 +-
 4 files changed, 128 insertions(+), 96 deletions(-)

diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml
index c61d8fd..48c3d06 100644
--- a/arrow-flight/Cargo.toml
+++ b/arrow-flight/Cargo.toml
@@ -28,17 +28,17 @@ license = "Apache-2.0"
 [dependencies]
 arrow = { path = "../arrow", version = "5.0.0" }
 base64 = "0.13"
-tonic = "0.4"
+tonic = "0.5"
 bytes = "1"
-prost = "0.7"
-prost-derive = "0.7"
+prost = "0.8"
+prost-derive = "0.8"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
 
 [dev-dependencies]
 futures = { version = "0.3", default-features = false, features = ["alloc"]}
 
 [build-dependencies]
-tonic-build = "0.4"
+tonic-build = "0.5"
 # Pin specific version of the tonic-build dependencies to avoid auto-generated
 # (and checked in) arrow.flight.protocol.rs from changing
 proc-macro2 = "=1.0.27"
diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs
index 5fce526..79c6a94 100644
--- a/arrow-flight/src/arrow.flight.protocol.rs
+++ b/arrow-flight/src/arrow.flight.protocol.rs
@@ -204,13 +204,14 @@ pub struct PutResult {
 }
 #[doc = r" Generated client implementations."]
 pub mod flight_service_client {
-    #![allow(unused_variables, dead_code, missing_docs)]
+    #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
     use tonic::codegen::*;
     #[doc = ""]
     #[doc = " A flight service is an endpoint for retrieving or storing Arrow data. A"]
     #[doc = " flight service can expose one or more predefined endpoints that can be"]
     #[doc = " accessed using the Arrow Flight Protocol. Additionally, a flight service"]
     #[doc = " can expose a set of actions that are available."]
+    #[derive(Debug, Clone)]
     pub struct FlightServiceClient<T> {
         inner: tonic::client::Grpc<T>,
     }
@@ -228,20 +229,43 @@ pub mod flight_service_client {
     impl<T> FlightServiceClient<T>
     where
         T: tonic::client::GrpcService<tonic::body::BoxBody>,
-        T::ResponseBody: Body + HttpBody + Send + 'static,
+        T::ResponseBody: Body + Send + Sync + 'static,
         T::Error: Into<StdError>,
-        <T::ResponseBody as HttpBody>::Error: Into<StdError> + Send,
+        <T::ResponseBody as Body>::Error: Into<StdError> + Send,
     {
         pub fn new(inner: T) -> Self {
             let inner = tonic::client::Grpc::new(inner);
             Self { inner }
         }
-        pub fn with_interceptor(
+        pub fn with_interceptor<F>(
             inner: T,
-            interceptor: impl Into<tonic::Interceptor>,
-        ) -> Self {
-            let inner = tonic::client::Grpc::with_interceptor(inner, interceptor);
-            Self { inner }
+            interceptor: F,
+        ) -> FlightServiceClient<InterceptedService<T, F>>
+        where
+            F: FnMut(tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status>,
+            T: tonic::codegen::Service<
+                http::Request<tonic::body::BoxBody>,
+                Response = http::Response<
+                    <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
+                >,
+            >,
+            <T as tonic::codegen::Service<http::Request<tonic::body::BoxBody>>>::Error:
+                Into<StdError> + Send + Sync,
+        {
+            FlightServiceClient::new(InterceptedService::new(inner, interceptor))
+        }
+        #[doc = r" Compress requests with `gzip`."]
+        #[doc = r""]
+        #[doc = r" This requires the server to support it otherwise it might respond with an"]
+        #[doc = r" error."]
+        pub fn send_gzip(mut self) -> Self {
+            self.inner = self.inner.send_gzip();
+            self
+        }
+        #[doc = r" Enable decompressing responses with `gzip`."]
+        pub fn accept_gzip(mut self) -> Self {
+            self.inner = self.inner.accept_gzip();
+            self
         }
         #[doc = ""]
         #[doc = " Handshake between client and server. Depending on the server, the"]
@@ -478,22 +502,10 @@ pub mod flight_service_client {
                 .await
         }
     }
-    impl<T: Clone> Clone for FlightServiceClient<T> {
-        fn clone(&self) -> Self {
-            Self {
-                inner: self.inner.clone(),
-            }
-        }
-    }
-    impl<T> std::fmt::Debug for FlightServiceClient<T> {
-        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-            write!(f, "FlightServiceClient {{ ... }}")
-        }
-    }
 }
 #[doc = r" Generated server implementations."]
 pub mod flight_service_server {
-    #![allow(unused_variables, dead_code, missing_docs)]
+    #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
     use tonic::codegen::*;
     #[doc = "Generated trait containing gRPC methods that should be implemented for use with FlightServiceServer."]
     #[async_trait]
@@ -635,27 +647,34 @@ pub mod flight_service_server {
     #[derive(Debug)]
     pub struct FlightServiceServer<T: FlightService> {
         inner: _Inner<T>,
+        accept_compression_encodings: (),
+        send_compression_encodings: (),
     }
-    struct _Inner<T>(Arc<T>, Option<tonic::Interceptor>);
+    struct _Inner<T>(Arc<T>);
     impl<T: FlightService> FlightServiceServer<T> {
         pub fn new(inner: T) -> Self {
             let inner = Arc::new(inner);
-            let inner = _Inner(inner, None);
-            Self { inner }
+            let inner = _Inner(inner);
+            Self {
+                inner,
+                accept_compression_encodings: Default::default(),
+                send_compression_encodings: Default::default(),
+            }
         }
-        pub fn with_interceptor(
+        pub fn with_interceptor<F>(
             inner: T,
-            interceptor: impl Into<tonic::Interceptor>,
-        ) -> Self {
-            let inner = Arc::new(inner);
-            let inner = _Inner(inner, Some(interceptor.into()));
-            Self { inner }
+            interceptor: F,
+        ) -> InterceptedService<Self, F>
+        where
+            F: FnMut(tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status>,
+        {
+            InterceptedService::new(Self::new(inner), interceptor)
         }
     }
-    impl<T, B> Service<http::Request<B>> for FlightServiceServer<T>
+    impl<T, B> tonic::codegen::Service<http::Request<B>> for FlightServiceServer<T>
     where
         T: FlightService,
-        B: HttpBody + Send + Sync + 'static,
+        B: Body + Send + Sync + 'static,
         B::Error: Into<StdError> + Send + 'static,
     {
         type Response = http::Response<tonic::body::BoxBody>;
@@ -691,17 +710,18 @@ pub mod flight_service_server {
                             Box::pin(fut)
                         }
                     }
+                    let accept_compression_encodings = self.accept_compression_encodings;
+                    let send_compression_encodings = self.send_compression_encodings;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let interceptor = inner.1;
                         let inner = inner.0;
                         let method = HandshakeSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
-                        let mut grpc = if let Some(interceptor) = interceptor {
-                            tonic::server::Grpc::with_interceptor(codec, interceptor)
-                        } else {
-                            tonic::server::Grpc::new(codec)
-                        };
+                        let mut grpc = tonic::server::Grpc::new(codec)
+                            .apply_compression_config(
+                                accept_compression_encodings,
+                                send_compression_encodings,
+                            );
                         let res = grpc.streaming(method, req).await;
                         Ok(res)
                     };
@@ -729,17 +749,18 @@ pub mod flight_service_server {
                             Box::pin(fut)
                         }
                     }
+                    let accept_compression_encodings = self.accept_compression_encodings;
+                    let send_compression_encodings = self.send_compression_encodings;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let interceptor = inner.1;
                         let inner = inner.0;
                         let method = ListFlightsSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
-                        let mut grpc = if let Some(interceptor) = interceptor {
-                            tonic::server::Grpc::with_interceptor(codec, interceptor)
-                        } else {
-                            tonic::server::Grpc::new(codec)
-                        };
+                        let mut grpc = tonic::server::Grpc::new(codec)
+                            .apply_compression_config(
+                                accept_compression_encodings,
+                                send_compression_encodings,
+                            );
                         let res = grpc.server_streaming(method, req).await;
                         Ok(res)
                     };
@@ -765,17 +786,18 @@ pub mod flight_service_server {
                             Box::pin(fut)
                         }
                     }
+                    let accept_compression_encodings = self.accept_compression_encodings;
+                    let send_compression_encodings = self.send_compression_encodings;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let interceptor = inner.1.clone();
                         let inner = inner.0;
                         let method = GetFlightInfoSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
-                        let mut grpc = if let Some(interceptor) = interceptor {
-                            tonic::server::Grpc::with_interceptor(codec, interceptor)
-                        } else {
-                            tonic::server::Grpc::new(codec)
-                        };
+                        let mut grpc = tonic::server::Grpc::new(codec)
+                            .apply_compression_config(
+                                accept_compression_encodings,
+                                send_compression_encodings,
+                            );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
                     };
@@ -800,17 +822,18 @@ pub mod flight_service_server {
                             Box::pin(fut)
                         }
                     }
+                    let accept_compression_encodings = self.accept_compression_encodings;
+                    let send_compression_encodings = self.send_compression_encodings;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let interceptor = inner.1.clone();
                         let inner = inner.0;
                         let method = GetSchemaSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
-                        let mut grpc = if let Some(interceptor) = interceptor {
-                            tonic::server::Grpc::with_interceptor(codec, interceptor)
-                        } else {
-                            tonic::server::Grpc::new(codec)
-                        };
+                        let mut grpc = tonic::server::Grpc::new(codec)
+                            .apply_compression_config(
+                                accept_compression_encodings,
+                                send_compression_encodings,
+                            );
                         let res = grpc.unary(method, req).await;
                         Ok(res)
                     };
@@ -838,17 +861,18 @@ pub mod flight_service_server {
                             Box::pin(fut)
                         }
                     }
+                    let accept_compression_encodings = self.accept_compression_encodings;
+                    let send_compression_encodings = self.send_compression_encodings;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let interceptor = inner.1;
                         let inner = inner.0;
                         let method = DoGetSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
-                        let mut grpc = if let Some(interceptor) = interceptor {
-                            tonic::server::Grpc::with_interceptor(codec, interceptor)
-                        } else {
-                            tonic::server::Grpc::new(codec)
-                        };
+                        let mut grpc = tonic::server::Grpc::new(codec)
+                            .apply_compression_config(
+                                accept_compression_encodings,
+                                send_compression_encodings,
+                            );
                         let res = grpc.server_streaming(method, req).await;
                         Ok(res)
                     };
@@ -876,17 +900,18 @@ pub mod flight_service_server {
                             Box::pin(fut)
                         }
                     }
+                    let accept_compression_encodings = self.accept_compression_encodings;
+                    let send_compression_encodings = self.send_compression_encodings;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let interceptor = inner.1;
                         let inner = inner.0;
                         let method = DoPutSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
-                        let mut grpc = if let Some(interceptor) = interceptor {
-                            tonic::server::Grpc::with_interceptor(codec, interceptor)
-                        } else {
-                            tonic::server::Grpc::new(codec)
-                        };
+                        let mut grpc = tonic::server::Grpc::new(codec)
+                            .apply_compression_config(
+                                accept_compression_encodings,
+                                send_compression_encodings,
+                            );
                         let res = grpc.streaming(method, req).await;
                         Ok(res)
                     };
@@ -914,17 +939,18 @@ pub mod flight_service_server {
                             Box::pin(fut)
                         }
                     }
+                    let accept_compression_encodings = self.accept_compression_encodings;
+                    let send_compression_encodings = self.send_compression_encodings;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let interceptor = inner.1;
                         let inner = inner.0;
                         let method = DoExchangeSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
-                        let mut grpc = if let Some(interceptor) = interceptor {
-                            tonic::server::Grpc::with_interceptor(codec, interceptor)
-                        } else {
-                            tonic::server::Grpc::new(codec)
-                        };
+                        let mut grpc = tonic::server::Grpc::new(codec)
+                            .apply_compression_config(
+                                accept_compression_encodings,
+                                send_compression_encodings,
+                            );
                         let res = grpc.streaming(method, req).await;
                         Ok(res)
                     };
@@ -952,17 +978,18 @@ pub mod flight_service_server {
                             Box::pin(fut)
                         }
                     }
+                    let accept_compression_encodings = self.accept_compression_encodings;
+                    let send_compression_encodings = self.send_compression_encodings;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let interceptor = inner.1;
                         let inner = inner.0;
                         let method = DoActionSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
-                        let mut grpc = if let Some(interceptor) = interceptor {
-                            tonic::server::Grpc::with_interceptor(codec, interceptor)
-                        } else {
-                            tonic::server::Grpc::new(codec)
-                        };
+                        let mut grpc = tonic::server::Grpc::new(codec)
+                            .apply_compression_config(
+                                accept_compression_encodings,
+                                send_compression_encodings,
+                            );
                         let res = grpc.server_streaming(method, req).await;
                         Ok(res)
                     };
@@ -990,17 +1017,18 @@ pub mod flight_service_server {
                             Box::pin(fut)
                         }
                     }
+                    let accept_compression_encodings = self.accept_compression_encodings;
+                    let send_compression_encodings = self.send_compression_encodings;
                     let inner = self.inner.clone();
                     let fut = async move {
-                        let interceptor = inner.1;
                         let inner = inner.0;
                         let method = ListActionsSvc(inner);
                         let codec = tonic::codec::ProstCodec::default();
-                        let mut grpc = if let Some(interceptor) = interceptor {
-                            tonic::server::Grpc::with_interceptor(codec, interceptor)
-                        } else {
-                            tonic::server::Grpc::new(codec)
-                        };
+                        let mut grpc = tonic::server::Grpc::new(codec)
+                            .apply_compression_config(
+                                accept_compression_encodings,
+                                send_compression_encodings,
+                            );
                         let res = grpc.server_streaming(method, req).await;
                         Ok(res)
                     };
@@ -1011,7 +1039,7 @@ pub mod flight_service_server {
                         .status(200)
                         .header("grpc-status", "12")
                         .header("content-type", "application/grpc")
-                        .body(tonic::body::BoxBody::empty())
+                        .body(empty_body())
                         .unwrap())
                 }),
             }
@@ -1020,12 +1048,16 @@ pub mod flight_service_server {
     impl<T: FlightService> Clone for FlightServiceServer<T> {
         fn clone(&self) -> Self {
             let inner = self.inner.clone();
-            Self { inner }
+            Self {
+                inner,
+                accept_compression_encodings: self.accept_compression_encodings,
+                send_compression_encodings: self.send_compression_encodings,
+            }
         }
     }
     impl<T: FlightService> Clone for _Inner<T> {
         fn clone(&self) -> Self {
-            Self(self.0.clone(), self.1.clone())
+            Self(self.0.clone())
         }
     }
     impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
diff --git a/arrow/src/compute/kernels/filter.rs b/arrow/src/compute/kernels/filter.rs
index bcef12c..65d016d 100644
--- a/arrow/src/compute/kernels/filter.rs
+++ b/arrow/src/compute/kernels/filter.rs
@@ -214,7 +214,7 @@ pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
 }
 
 /// Remove null values by do a bitmask AND operation with null bits and the boolean bits.
-fn prep_null_mask_filter(filter: &BooleanArray) -> BooleanArray {
+pub fn prep_null_mask_filter(filter: &BooleanArray) -> BooleanArray {
     let array_data = filter.data_ref();
     let null_bitmap = array_data.null_buffer().unwrap();
     let mask = filter.values();
diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml
index 14049b4..19b75fb 100644
--- a/integration-testing/Cargo.toml
+++ b/integration-testing/Cargo.toml
@@ -36,10 +36,10 @@ async-trait = "0.1.41"
 clap = "2.33"
 futures = "0.3"
 hex = "0.4"
-prost = "0.7"
+prost = "0.8"
 serde = { version = "1.0", features = ["rc"] }
 serde_derive = "1.0"
 serde_json = { version = "1.0", features = ["preserve_order"] }
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
-tonic = "0.4"
+tonic = "0.5"
 tracing-subscriber = { version = "0.2.15", optional = true }