You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/20 11:26:16 UTC
[arrow-rs] branch active_release updated: Bump prost and tonic
(#560) (#565)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch active_release
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/active_release by this push:
new 2998db4 Bump prost and tonic (#560) (#565)
2998db4 is described below
commit 2998db4a07b5f9cd8da292d7e7e990a19b0b7535
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Jul 20 07:26:09 2021 -0400
Bump prost and tonic (#560) (#565)
* Update tonic and prost
Signed-off-by: Chojan Shang <ps...@outlook.com>
* Make filter::prep_null_mask_filter public
Signed-off-by: Chojan Shang <ps...@outlook.com>
* Bump prost and tonic in integration-testing
Signed-off-by: Chojan Shang <ps...@outlook.com>
Co-authored-by: Chojan Shang <ps...@ritelabs.net>
---
arrow-flight/Cargo.toml | 8 +-
arrow-flight/src/arrow.flight.protocol.rs | 210 +++++++++++++++++-------------
arrow/src/compute/kernels/filter.rs | 2 +-
integration-testing/Cargo.toml | 4 +-
4 files changed, 128 insertions(+), 96 deletions(-)
diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml
index c61d8fd..48c3d06 100644
--- a/arrow-flight/Cargo.toml
+++ b/arrow-flight/Cargo.toml
@@ -28,17 +28,17 @@ license = "Apache-2.0"
[dependencies]
arrow = { path = "../arrow", version = "5.0.0" }
base64 = "0.13"
-tonic = "0.4"
+tonic = "0.5"
bytes = "1"
-prost = "0.7"
-prost-derive = "0.7"
+prost = "0.8"
+prost-derive = "0.8"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
[dev-dependencies]
futures = { version = "0.3", default-features = false, features = ["alloc"]}
[build-dependencies]
-tonic-build = "0.4"
+tonic-build = "0.5"
# Pin specific version of the tonic-build dependencies to avoid auto-generated
# (and checked in) arrow.flight.protocol.rs from changing
proc-macro2 = "=1.0.27"
diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs
index 5fce526..79c6a94 100644
--- a/arrow-flight/src/arrow.flight.protocol.rs
+++ b/arrow-flight/src/arrow.flight.protocol.rs
@@ -204,13 +204,14 @@ pub struct PutResult {
}
#[doc = r" Generated client implementations."]
pub mod flight_service_client {
- #![allow(unused_variables, dead_code, missing_docs)]
+ #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
#[doc = ""]
#[doc = " A flight service is an endpoint for retrieving or storing Arrow data. A"]
#[doc = " flight service can expose one or more predefined endpoints that can be"]
#[doc = " accessed using the Arrow Flight Protocol. Additionally, a flight service"]
#[doc = " can expose a set of actions that are available."]
+ #[derive(Debug, Clone)]
pub struct FlightServiceClient<T> {
inner: tonic::client::Grpc<T>,
}
@@ -228,20 +229,43 @@ pub mod flight_service_client {
impl<T> FlightServiceClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
- T::ResponseBody: Body + HttpBody + Send + 'static,
+ T::ResponseBody: Body + Send + Sync + 'static,
T::Error: Into<StdError>,
- <T::ResponseBody as HttpBody>::Error: Into<StdError> + Send,
+ <T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
let inner = tonic::client::Grpc::new(inner);
Self { inner }
}
- pub fn with_interceptor(
+ pub fn with_interceptor<F>(
inner: T,
- interceptor: impl Into<tonic::Interceptor>,
- ) -> Self {
- let inner = tonic::client::Grpc::with_interceptor(inner, interceptor);
- Self { inner }
+ interceptor: F,
+ ) -> FlightServiceClient<InterceptedService<T, F>>
+ where
+ F: FnMut(tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status>,
+ T: tonic::codegen::Service<
+ http::Request<tonic::body::BoxBody>,
+ Response = http::Response<
+ <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
+ >,
+ >,
+ <T as tonic::codegen::Service<http::Request<tonic::body::BoxBody>>>::Error:
+ Into<StdError> + Send + Sync,
+ {
+ FlightServiceClient::new(InterceptedService::new(inner, interceptor))
+ }
+ #[doc = r" Compress requests with `gzip`."]
+ #[doc = r""]
+ #[doc = r" This requires the server to support it otherwise it might respond with an"]
+ #[doc = r" error."]
+ pub fn send_gzip(mut self) -> Self {
+ self.inner = self.inner.send_gzip();
+ self
+ }
+ #[doc = r" Enable decompressing responses with `gzip`."]
+ pub fn accept_gzip(mut self) -> Self {
+ self.inner = self.inner.accept_gzip();
+ self
}
#[doc = ""]
#[doc = " Handshake between client and server. Depending on the server, the"]
@@ -478,22 +502,10 @@ pub mod flight_service_client {
.await
}
}
- impl<T: Clone> Clone for FlightServiceClient<T> {
- fn clone(&self) -> Self {
- Self {
- inner: self.inner.clone(),
- }
- }
- }
- impl<T> std::fmt::Debug for FlightServiceClient<T> {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "FlightServiceClient {{ ... }}")
- }
- }
}
#[doc = r" Generated server implementations."]
pub mod flight_service_server {
- #![allow(unused_variables, dead_code, missing_docs)]
+ #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
#[doc = "Generated trait containing gRPC methods that should be implemented for use with FlightServiceServer."]
#[async_trait]
@@ -635,27 +647,34 @@ pub mod flight_service_server {
#[derive(Debug)]
pub struct FlightServiceServer<T: FlightService> {
inner: _Inner<T>,
+ accept_compression_encodings: (),
+ send_compression_encodings: (),
}
- struct _Inner<T>(Arc<T>, Option<tonic::Interceptor>);
+ struct _Inner<T>(Arc<T>);
impl<T: FlightService> FlightServiceServer<T> {
pub fn new(inner: T) -> Self {
let inner = Arc::new(inner);
- let inner = _Inner(inner, None);
- Self { inner }
+ let inner = _Inner(inner);
+ Self {
+ inner,
+ accept_compression_encodings: Default::default(),
+ send_compression_encodings: Default::default(),
+ }
}
- pub fn with_interceptor(
+ pub fn with_interceptor<F>(
inner: T,
- interceptor: impl Into<tonic::Interceptor>,
- ) -> Self {
- let inner = Arc::new(inner);
- let inner = _Inner(inner, Some(interceptor.into()));
- Self { inner }
+ interceptor: F,
+ ) -> InterceptedService<Self, F>
+ where
+ F: FnMut(tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status>,
+ {
+ InterceptedService::new(Self::new(inner), interceptor)
}
}
- impl<T, B> Service<http::Request<B>> for FlightServiceServer<T>
+ impl<T, B> tonic::codegen::Service<http::Request<B>> for FlightServiceServer<T>
where
T: FlightService,
- B: HttpBody + Send + Sync + 'static,
+ B: Body + Send + Sync + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
@@ -691,17 +710,18 @@ pub mod flight_service_server {
Box::pin(fut)
}
}
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
- let interceptor = inner.1;
let inner = inner.0;
let method = HandshakeSvc(inner);
let codec = tonic::codec::ProstCodec::default();
- let mut grpc = if let Some(interceptor) = interceptor {
- tonic::server::Grpc::with_interceptor(codec, interceptor)
- } else {
- tonic::server::Grpc::new(codec)
- };
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
let res = grpc.streaming(method, req).await;
Ok(res)
};
@@ -729,17 +749,18 @@ pub mod flight_service_server {
Box::pin(fut)
}
}
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
- let interceptor = inner.1;
let inner = inner.0;
let method = ListFlightsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
- let mut grpc = if let Some(interceptor) = interceptor {
- tonic::server::Grpc::with_interceptor(codec, interceptor)
- } else {
- tonic::server::Grpc::new(codec)
- };
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
@@ -765,17 +786,18 @@ pub mod flight_service_server {
Box::pin(fut)
}
}
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
- let interceptor = inner.1.clone();
let inner = inner.0;
let method = GetFlightInfoSvc(inner);
let codec = tonic::codec::ProstCodec::default();
- let mut grpc = if let Some(interceptor) = interceptor {
- tonic::server::Grpc::with_interceptor(codec, interceptor)
- } else {
- tonic::server::Grpc::new(codec)
- };
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
let res = grpc.unary(method, req).await;
Ok(res)
};
@@ -800,17 +822,18 @@ pub mod flight_service_server {
Box::pin(fut)
}
}
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
- let interceptor = inner.1.clone();
let inner = inner.0;
let method = GetSchemaSvc(inner);
let codec = tonic::codec::ProstCodec::default();
- let mut grpc = if let Some(interceptor) = interceptor {
- tonic::server::Grpc::with_interceptor(codec, interceptor)
- } else {
- tonic::server::Grpc::new(codec)
- };
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
let res = grpc.unary(method, req).await;
Ok(res)
};
@@ -838,17 +861,18 @@ pub mod flight_service_server {
Box::pin(fut)
}
}
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
- let interceptor = inner.1;
let inner = inner.0;
let method = DoGetSvc(inner);
let codec = tonic::codec::ProstCodec::default();
- let mut grpc = if let Some(interceptor) = interceptor {
- tonic::server::Grpc::with_interceptor(codec, interceptor)
- } else {
- tonic::server::Grpc::new(codec)
- };
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
@@ -876,17 +900,18 @@ pub mod flight_service_server {
Box::pin(fut)
}
}
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
- let interceptor = inner.1;
let inner = inner.0;
let method = DoPutSvc(inner);
let codec = tonic::codec::ProstCodec::default();
- let mut grpc = if let Some(interceptor) = interceptor {
- tonic::server::Grpc::with_interceptor(codec, interceptor)
- } else {
- tonic::server::Grpc::new(codec)
- };
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
let res = grpc.streaming(method, req).await;
Ok(res)
};
@@ -914,17 +939,18 @@ pub mod flight_service_server {
Box::pin(fut)
}
}
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
- let interceptor = inner.1;
let inner = inner.0;
let method = DoExchangeSvc(inner);
let codec = tonic::codec::ProstCodec::default();
- let mut grpc = if let Some(interceptor) = interceptor {
- tonic::server::Grpc::with_interceptor(codec, interceptor)
- } else {
- tonic::server::Grpc::new(codec)
- };
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
let res = grpc.streaming(method, req).await;
Ok(res)
};
@@ -952,17 +978,18 @@ pub mod flight_service_server {
Box::pin(fut)
}
}
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
- let interceptor = inner.1;
let inner = inner.0;
let method = DoActionSvc(inner);
let codec = tonic::codec::ProstCodec::default();
- let mut grpc = if let Some(interceptor) = interceptor {
- tonic::server::Grpc::with_interceptor(codec, interceptor)
- } else {
- tonic::server::Grpc::new(codec)
- };
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
@@ -990,17 +1017,18 @@ pub mod flight_service_server {
Box::pin(fut)
}
}
+ let accept_compression_encodings = self.accept_compression_encodings;
+ let send_compression_encodings = self.send_compression_encodings;
let inner = self.inner.clone();
let fut = async move {
- let interceptor = inner.1;
let inner = inner.0;
let method = ListActionsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
- let mut grpc = if let Some(interceptor) = interceptor {
- tonic::server::Grpc::with_interceptor(codec, interceptor)
- } else {
- tonic::server::Grpc::new(codec)
- };
+ let mut grpc = tonic::server::Grpc::new(codec)
+ .apply_compression_config(
+ accept_compression_encodings,
+ send_compression_encodings,
+ );
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
@@ -1011,7 +1039,7 @@ pub mod flight_service_server {
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
- .body(tonic::body::BoxBody::empty())
+ .body(empty_body())
.unwrap())
}),
}
@@ -1020,12 +1048,16 @@ pub mod flight_service_server {
impl<T: FlightService> Clone for FlightServiceServer<T> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
- Self { inner }
+ Self {
+ inner,
+ accept_compression_encodings: self.accept_compression_encodings,
+ send_compression_encodings: self.send_compression_encodings,
+ }
}
}
impl<T: FlightService> Clone for _Inner<T> {
fn clone(&self) -> Self {
- Self(self.0.clone(), self.1.clone())
+ Self(self.0.clone())
}
}
impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
diff --git a/arrow/src/compute/kernels/filter.rs b/arrow/src/compute/kernels/filter.rs
index bcef12c..65d016d 100644
--- a/arrow/src/compute/kernels/filter.rs
+++ b/arrow/src/compute/kernels/filter.rs
@@ -214,7 +214,7 @@ pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
}
/// Remove null values by do a bitmask AND operation with null bits and the boolean bits.
-fn prep_null_mask_filter(filter: &BooleanArray) -> BooleanArray {
+pub fn prep_null_mask_filter(filter: &BooleanArray) -> BooleanArray {
let array_data = filter.data_ref();
let null_bitmap = array_data.null_buffer().unwrap();
let mask = filter.values();
diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml
index 14049b4..19b75fb 100644
--- a/integration-testing/Cargo.toml
+++ b/integration-testing/Cargo.toml
@@ -36,10 +36,10 @@ async-trait = "0.1.41"
clap = "2.33"
futures = "0.3"
hex = "0.4"
-prost = "0.7"
+prost = "0.8"
serde = { version = "1.0", features = ["rc"] }
serde_derive = "1.0"
serde_json = { version = "1.0", features = ["preserve_order"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
-tonic = "0.4"
+tonic = "0.5"
tracing-subscriber = { version = "0.2.15", optional = true }