You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2023/01/19 19:04:30 UTC
[arrow-datafusion] branch master updated: refactor and add simple function to deserialize and serialize proto b… (#4892)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6dce728a3 refactor and add simple function to deserialize and serialize proto b… (#4892)
6dce728a3 is described below
commit 6dce728a3c7130ca3590a16f413c7c6ccb7209b7
Author: Jeremy Dyer <jd...@gmail.com>
AuthorDate: Thu Jan 19 14:04:24 2023 -0500
refactor and add simple function to deserialize and serialize proto b… (#4892)
* refactor and add simple function to deserialize and serialize proto bytes without reading/writing to file
* Resolve syntax issues causing compilation errors and modified serialize_bytes to return a Result
* [review] Introduce DataFusionError on failure to encode and decode bytes
* Change error type from DataFusionError::Internal -> DataFusionError::Substrait
* cargo fmt fixes
---
datafusion/substrait/src/serializer.rs | 24 ++++++++++++++++++------
1 file changed, 18 insertions(+), 6 deletions(-)
diff --git a/datafusion/substrait/src/serializer.rs b/datafusion/substrait/src/serializer.rs
index 44d0c2594..2e09d1c9f 100644
--- a/datafusion/substrait/src/serializer.rs
+++ b/datafusion/substrait/src/serializer.rs
@@ -17,6 +17,7 @@
use crate::producer;
+use datafusion::common::DataFusionError;
use datafusion::error::Result;
use datafusion::prelude::*;
@@ -27,15 +28,22 @@ use std::fs::OpenOptions;
use std::io::{Read, Write};
pub async fn serialize(sql: &str, ctx: &SessionContext, path: &str) -> Result<()> {
+ let protobuf_out = serialize_bytes(sql, ctx).await;
+ let mut file = OpenOptions::new().create(true).write(true).open(path)?;
+ file.write_all(&protobuf_out?)?;
+ Ok(())
+}
+
+pub async fn serialize_bytes(sql: &str, ctx: &SessionContext) -> Result<Vec<u8>> {
let df = ctx.sql(sql).await?;
let plan = df.into_optimized_plan()?;
let proto = producer::to_substrait_plan(&plan)?;
let mut protobuf_out = Vec::<u8>::new();
- proto.encode(&mut protobuf_out).unwrap();
- let mut file = OpenOptions::new().create(true).write(true).open(path)?;
- file.write_all(&protobuf_out)?;
- Ok(())
+ proto.encode(&mut protobuf_out).map_err(|e| {
+ DataFusionError::Substrait(format!("Failed to encode substrait plan: {}", e))
+ })?;
+ Ok(protobuf_out)
}
pub async fn deserialize(path: &str) -> Result<Box<Plan>> {
@@ -44,7 +52,11 @@ pub async fn deserialize(path: &str) -> Result<Box<Plan>> {
let mut file = OpenOptions::new().read(true).open(path)?;
file.read_to_end(&mut protobuf_in)?;
- let proto = Message::decode(&*protobuf_in).unwrap();
+ deserialize_bytes(protobuf_in).await
+}
- Ok(Box::new(proto))
+pub async fn deserialize_bytes(proto_bytes: Vec<u8>) -> Result<Box<Plan>> {
+ Ok(Box::new(Message::decode(&*proto_bytes).map_err(|e| {
+ DataFusionError::Substrait(format!("Failed to decode substrait plan: {}", e))
+ })?))
}