You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/27 10:29:59 UTC
[arrow-datafusion] branch master updated: Add public Serialization/Deserialization API for `Expr` to/from bytes (#2341)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0d125356b Add public Serialization/Deserialization API for `Expr` to/from bytes (#2341)
0d125356b is described below
commit 0d125356ba482e9302cd52963980157965bce9e1
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Wed Apr 27 06:29:52 2022 -0400
Add public Serialization/Deserialization API for `Expr` to/from bytes (#2341)
* Add public Serialization/Deserialization API to/from bytes
* cleanup
* Remove Expr from prelude
* RAT, add Bytes API
* Rename to `bytes`
* Break out registry into its own module
* Add more tests
---
datafusion/proto/src/bytes/mod.rs | 168 +++++++++++++++++++++++++++++++++
datafusion/proto/src/bytes/registry.rs | 46 +++++++++
datafusion/proto/src/lib.rs | 1 +
3 files changed, 215 insertions(+)
diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs
new file mode 100644
index 000000000..67bf65d6a
--- /dev/null
+++ b/datafusion/proto/src/bytes/mod.rs
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Serialization / Deserialization to Bytes
+use crate::{from_proto::parse_expr, protobuf};
+use datafusion::{
+ common::{DataFusionError, Result},
+ logical_plan::{Expr, FunctionRegistry},
+};
+use prost::{bytes::BytesMut, Message};
+
+// Reexport Bytes which appears in the API
+pub use prost::bytes::Bytes;
+
+mod registry;
+
+/// Encodes something (such as [`Expr`]) to/from a stream of
+/// bytes.
+///
+/// ```
+/// use datafusion::prelude::*;
+/// use datafusion::logical_plan::Expr;
+/// use datafusion_proto::bytes::Serializeable;
+///
+/// // Create a new `Expr` a < 32
+/// let expr = col("a").lt(lit(5i32));
+///
+/// // Convert it to an opaque form
+/// let bytes = expr.to_bytes().unwrap();
+///
+/// // Decode bytes from somewhere (over network, etc.
+/// let decoded_expr = Expr::from_bytes(&bytes).unwrap();
+/// assert_eq!(expr, decoded_expr);
+/// ```
+pub trait Serializeable: Sized {
+ /// Convert `self` to an opaque byt stream
+ fn to_bytes(&self) -> Result<Bytes>;
+
+ /// Convert `bytes` (the output of [`to_bytes`] back into an
+ /// object. This will error if the serialized bytes contain any
+ /// user defined functions, in which case use
+ /// [`from_bytes_with_registry`]
+ fn from_bytes(bytes: &[u8]) -> Result<Self> {
+ Self::from_bytes_with_registry(bytes, ®istry::NoRegistry {})
+ }
+
+ /// Convert `bytes` (the output of [`to_bytes`] back into an
+ /// object resolving user defined functions with the specified
+ /// `registry`
+ fn from_bytes_with_registry(
+ bytes: &[u8],
+ registry: &dyn FunctionRegistry,
+ ) -> Result<Self>;
+}
+
+impl Serializeable for Expr {
+ fn to_bytes(&self) -> Result<Bytes> {
+ let mut buffer = BytesMut::new();
+ let protobuf: protobuf::LogicalExprNode = self.try_into().map_err(|e| {
+ DataFusionError::Plan(format!("Error encoding expr as protobuf: {}", e))
+ })?;
+
+ protobuf.encode(&mut buffer).map_err(|e| {
+ DataFusionError::Plan(format!("Error encoding protobuf as bytes: {}", e))
+ })?;
+
+ Ok(buffer.into())
+ }
+
+ fn from_bytes_with_registry(
+ bytes: &[u8],
+ registry: &dyn FunctionRegistry,
+ ) -> Result<Self> {
+ let protobuf = protobuf::LogicalExprNode::decode(bytes).map_err(|e| {
+ DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e))
+ })?;
+
+ parse_expr(&protobuf, registry).map_err(|e| {
+ DataFusionError::Plan(format!("Error parsing protobuf into Expr: {}", e))
+ })
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use std::sync::Arc;
+
+ use datafusion::{
+ arrow::array::ArrayRef, arrow::datatypes::DataType, logical_expr::Volatility,
+ logical_plan::create_udf, physical_plan::functions::make_scalar_function,
+ prelude::*,
+ };
+
+ #[test]
+ #[should_panic(
+ expected = "Error decoding expr as protobuf: failed to decode Protobuf message"
+ )]
+ fn bad_decode() {
+ Expr::from_bytes(b"Leet").unwrap();
+ }
+
+ #[test]
+ fn udf_roundtrip_with_registry() {
+ let ctx = context_with_udf();
+
+ let expr = ctx
+ .udf("dummy")
+ .expect("could not find udf")
+ .call(vec![lit("")]);
+
+ let bytes = expr.to_bytes().unwrap();
+ let deserialized_expr = Expr::from_bytes_with_registry(&bytes, &ctx).unwrap();
+
+ assert_eq!(expr, deserialized_expr);
+ }
+
+ #[test]
+ #[should_panic(
+ expected = "No function registry provided to deserialize, so can not deserialize User Defined Function 'dummy'"
+ )]
+ fn udf_roundtrip_without_registry() {
+ let ctx = context_with_udf();
+
+ let expr = ctx
+ .udf("dummy")
+ .expect("could not find udf")
+ .call(vec![lit("")]);
+
+ let bytes = expr.to_bytes().unwrap();
+ // should explode
+ Expr::from_bytes(&bytes).unwrap();
+ }
+
+ /// return a `SessionContext` with a `dummy` function registered as a UDF
+ fn context_with_udf() -> SessionContext {
+ let fn_impl = |args: &[ArrayRef]| Ok(Arc::new(args[0].clone()) as ArrayRef);
+
+ let scalar_fn = make_scalar_function(fn_impl);
+
+ let udf = create_udf(
+ "dummy",
+ vec![DataType::Utf8],
+ Arc::new(DataType::Utf8),
+ Volatility::Immutable,
+ scalar_fn,
+ );
+
+ let mut ctx = SessionContext::new();
+ ctx.register_udf(udf);
+
+ ctx
+ }
+}
diff --git a/datafusion/proto/src/bytes/registry.rs b/datafusion/proto/src/bytes/registry.rs
new file mode 100644
index 000000000..e49cbc629
--- /dev/null
+++ b/datafusion/proto/src/bytes/registry.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::HashSet, sync::Arc};
+
+use datafusion::{
+ common::{DataFusionError, Result},
+ logical_expr::{AggregateUDF, ScalarUDF},
+ logical_plan::FunctionRegistry,
+};
+
+/// A default [`FunctionRegistry`] registry that does not resolve any
+/// user defined functions
+pub(crate) struct NoRegistry {}
+
+impl FunctionRegistry for NoRegistry {
+ fn udfs(&self) -> HashSet<String> {
+ HashSet::new()
+ }
+
+ fn udf(&self, name: &str) -> Result<Arc<ScalarUDF>> {
+ Err(DataFusionError::Plan(
+ format!("No function registry provided to deserialize, so can not deserialize User Defined Function '{}'", name))
+ )
+ }
+
+ fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
+ Err(DataFusionError::Plan(
+ format!("No function registry provided to deserialize, so can not deserialize User Defined Aggregate Function '{}'", name))
+ )
+ }
+}
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 8228954a9..dcc18373c 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -21,6 +21,7 @@ pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/datafusion.rs"));
}
+pub mod bytes;
pub mod from_proto;
pub mod to_proto;