You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2023/01/19 19:04:30 UTC

[arrow-datafusion] branch master updated: refactor and add simple function to deserialize and serialize proto b… (#4892)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dce728a3 refactor and add simple function to deserialize and serialize proto b… (#4892)
6dce728a3 is described below

commit 6dce728a3c7130ca3590a16f413c7c6ccb7209b7
Author: Jeremy Dyer <jd...@gmail.com>
AuthorDate: Thu Jan 19 14:04:24 2023 -0500

    refactor and add simple function to deserialize and serialize proto b… (#4892)
    
    * refactor and add simple function to deserialize and serialize proto bytes without reading/writing to file
    
    * Resolve syntax issues causing compilation errors and modified serialize_bytes to return a Result
    
    * [review] Introduce DataFusionError on failure to encode and decode bytes
    
    * Change error type from DataFusionError::Internal -> DataFusionError::Substrait
    
    * cargo fmt fixes
---
 datafusion/substrait/src/serializer.rs | 24 ++++++++++++++++++------
 1 file changed, 18 insertions(+), 6 deletions(-)

diff --git a/datafusion/substrait/src/serializer.rs b/datafusion/substrait/src/serializer.rs
index 44d0c2594..2e09d1c9f 100644
--- a/datafusion/substrait/src/serializer.rs
+++ b/datafusion/substrait/src/serializer.rs
@@ -17,6 +17,7 @@
 
 use crate::producer;
 
+use datafusion::common::DataFusionError;
 use datafusion::error::Result;
 use datafusion::prelude::*;
 
@@ -27,15 +28,22 @@ use std::fs::OpenOptions;
 use std::io::{Read, Write};
 
 pub async fn serialize(sql: &str, ctx: &SessionContext, path: &str) -> Result<()> {
+    let protobuf_out = serialize_bytes(sql, ctx).await;
+    let mut file = OpenOptions::new().create(true).write(true).open(path)?;
+    file.write_all(&protobuf_out?)?;
+    Ok(())
+}
+
+pub async fn serialize_bytes(sql: &str, ctx: &SessionContext) -> Result<Vec<u8>> {
     let df = ctx.sql(sql).await?;
     let plan = df.into_optimized_plan()?;
     let proto = producer::to_substrait_plan(&plan)?;
 
     let mut protobuf_out = Vec::<u8>::new();
-    proto.encode(&mut protobuf_out).unwrap();
-    let mut file = OpenOptions::new().create(true).write(true).open(path)?;
-    file.write_all(&protobuf_out)?;
-    Ok(())
+    proto.encode(&mut protobuf_out).map_err(|e| {
+        DataFusionError::Substrait(format!("Failed to encode substrait plan: {}", e))
+    })?;
+    Ok(protobuf_out)
 }
 
 pub async fn deserialize(path: &str) -> Result<Box<Plan>> {
@@ -44,7 +52,11 @@ pub async fn deserialize(path: &str) -> Result<Box<Plan>> {
     let mut file = OpenOptions::new().read(true).open(path)?;
 
     file.read_to_end(&mut protobuf_in)?;
-    let proto = Message::decode(&*protobuf_in).unwrap();
+    deserialize_bytes(protobuf_in).await
+}
 
-    Ok(Box::new(proto))
+pub async fn deserialize_bytes(proto_bytes: Vec<u8>) -> Result<Box<Plan>> {
+    Ok(Box::new(Message::decode(&*proto_bytes).map_err(|e| {
+        DataFusionError::Substrait(format!("Failed to decode substrait plan: {}", e))
+    })?))
 }