You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by me...@apache.org on 2024/01/10 12:13:04 UTC

(arrow-datafusion) branch main updated: Standardize `CompressionTypeVariant` encoding in protobuf (#8785)

This is an automated email from the ASF dual-hosted git repository.

mete pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e78f4bd928 Standardize `CompressionTypeVariant` encoding in protobuf (#8785)
e78f4bd928 is described below

commit e78f4bd9289db4ed41e445e56d59540491aeced2
Author: tushushu <33...@users.noreply.github.com>
AuthorDate: Wed Jan 10 20:12:58 2024 +0800

    Standardize `CompressionTypeVariant` encoding in protobuf (#8785)
    
    * draft
    
    * fix err
    
    * fix
    
    * fix err
    
    * Clippy + use reserved
    
    ---------
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/proto/proto/datafusion.proto          |  3 ++-
 datafusion/proto/src/generated/pbjson.rs         | 10 ++++++----
 datafusion/proto/src/generated/prost.rs          |  4 ++--
 datafusion/proto/src/logical_plan/mod.rs         | 17 +++++++++++++++--
 datafusion/proto/src/physical_plan/from_proto.rs | 12 ++++++++++++
 5 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index d5f8397aa3..f4089e83c6 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -212,7 +212,8 @@ message CreateExternalTableNode {
   bool if_not_exists = 7;
   string delimiter = 8;
   string definition = 9;
-  string file_compression_type = 10;
+  reserved 10; // was string file_compression_type
+  CompressionTypeVariant file_compression_type = 17;
   repeated LogicalExprNodeCollection order_exprs = 13;
   bool unbounded = 14;
   map<string, string> options = 11;
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 12e834d75a..4c9cbafd8f 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -4175,7 +4175,7 @@ impl serde::Serialize for CreateExternalTableNode {
         if !self.definition.is_empty() {
             len += 1;
         }
-        if !self.file_compression_type.is_empty() {
+        if self.file_compression_type != 0 {
             len += 1;
         }
         if !self.order_exprs.is_empty() {
@@ -4221,8 +4221,10 @@ impl serde::Serialize for CreateExternalTableNode {
         if !self.definition.is_empty() {
             struct_ser.serialize_field("definition", &self.definition)?;
         }
-        if !self.file_compression_type.is_empty() {
-            struct_ser.serialize_field("fileCompressionType", &self.file_compression_type)?;
+        if self.file_compression_type != 0 {
+            let v = CompressionTypeVariant::try_from(self.file_compression_type)
+                .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.file_compression_type)))?;
+            struct_ser.serialize_field("fileCompressionType", &v)?;
         }
         if !self.order_exprs.is_empty() {
             struct_ser.serialize_field("orderExprs", &self.order_exprs)?;
@@ -4420,7 +4422,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
                             if file_compression_type__.is_some() {
                                 return Err(serde::de::Error::duplicate_field("fileCompressionType"));
                             }
-                            file_compression_type__ = Some(map_.next_value()?);
+                            file_compression_type__ = Some(map_.next_value::<CompressionTypeVariant>()? as i32);
                         }
                         GeneratedField::OrderExprs => {
                             if order_exprs__.is_some() {
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 4ee0b70325..5db5f3cab7 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -349,8 +349,8 @@ pub struct CreateExternalTableNode {
     pub delimiter: ::prost::alloc::string::String,
     #[prost(string, tag = "9")]
     pub definition: ::prost::alloc::string::String,
-    #[prost(string, tag = "10")]
-    pub file_compression_type: ::prost::alloc::string::String,
+    #[prost(enumeration = "CompressionTypeVariant", tag = "17")]
+    pub file_compression_type: i32,
     #[prost(message, repeated, tag = "13")]
     pub order_exprs: ::prost::alloc::vec::Vec<LogicalExprNodeCollection>,
     #[prost(bool, tag = "14")]
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index e8a3878448..6ca95519a9 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -538,6 +538,16 @@ impl AsLogicalPlan for LogicalPlanNode {
                     column_defaults.insert(col_name.clone(), expr);
                 }
 
+                let file_compression_type = protobuf::CompressionTypeVariant::try_from(
+                    create_extern_table.file_compression_type,
+                )
+                .map_err(|_| {
+                    proto_error(format!(
+                        "Unknown file compression type {}",
+                        create_extern_table.file_compression_type
+                    ))
+                })?;
+
                 Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(CreateExternalTable {
                     schema: pb_schema.try_into()?,
                     name: from_owned_table_reference(create_extern_table.name.as_ref(), "CreateExternalTable")?,
@@ -552,7 +562,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         .clone(),
                     order_exprs,
                     if_not_exists: create_extern_table.if_not_exists,
-                    file_compression_type: CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_| DataFusionError::NotImplemented(format!("Unsupported file compression type {}", create_extern_table.file_compression_type)))?,
+                    file_compression_type: file_compression_type.into(),
                     definition,
                     unbounded: create_extern_table.unbounded,
                     options: create_extern_table.options.clone(),
@@ -1410,6 +1420,9 @@ impl AsLogicalPlan for LogicalPlanNode {
                     converted_column_defaults.insert(col_name.clone(), expr.try_into()?);
                 }
 
+                let file_compression_type =
+                    protobuf::CompressionTypeVariant::from(file_compression_type);
+
                 Ok(protobuf::LogicalPlanNode {
                     logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
                         protobuf::CreateExternalTableNode {
@@ -1423,7 +1436,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                             delimiter: String::from(*delimiter),
                             order_exprs: converted_order_exprs,
                             definition: definition.clone().unwrap_or_default(),
-                            file_compression_type: file_compression_type.to_string(),
+                            file_compression_type: file_compression_type.into(),
                             unbounded: *unbounded,
                             options: options.clone(),
                             constraints: Some(constraints.clone().into()),
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 23ab813ca7..193c4dfe03 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -782,6 +782,18 @@ impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
     }
 }
 
+impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
+    fn from(value: CompressionTypeVariant) -> Self {
+        match value {
+            CompressionTypeVariant::GZIP => Self::Gzip,
+            CompressionTypeVariant::BZIP2 => Self::Bzip2,
+            CompressionTypeVariant::XZ => Self::Xz,
+            CompressionTypeVariant::ZSTD => Self::Zstd,
+            CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
+        }
+    }
+}
+
 impl TryFrom<&protobuf::FileTypeWriterOptions> for FileTypeWriterOptions {
     type Error = DataFusionError;