You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by me...@apache.org on 2024/01/10 12:13:04 UTC
(arrow-datafusion) branch main updated: Standardize `CompressionTypeVariant` encoding in protobuf (#8785)
This is an automated email from the ASF dual-hosted git repository.
mete pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e78f4bd928 Standardize `CompressionTypeVariant` encoding in protobuf (#8785)
e78f4bd928 is described below
commit e78f4bd9289db4ed41e445e56d59540491aeced2
Author: tushushu <33...@users.noreply.github.com>
AuthorDate: Wed Jan 10 20:12:58 2024 +0800
Standardize `CompressionTypeVariant` encoding in protobuf (#8785)
* draft
* fix err
* fix
* fix err
* Clippy + use reserved
---------
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
datafusion/proto/proto/datafusion.proto | 3 ++-
datafusion/proto/src/generated/pbjson.rs | 10 ++++++----
datafusion/proto/src/generated/prost.rs | 4 ++--
datafusion/proto/src/logical_plan/mod.rs | 17 +++++++++++++++--
datafusion/proto/src/physical_plan/from_proto.rs | 12 ++++++++++++
5 files changed, 37 insertions(+), 9 deletions(-)
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index d5f8397aa3..f4089e83c6 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -212,7 +212,8 @@ message CreateExternalTableNode {
bool if_not_exists = 7;
string delimiter = 8;
string definition = 9;
- string file_compression_type = 10;
+ reserved 10; // was string file_compression_type
+ CompressionTypeVariant file_compression_type = 17;
repeated LogicalExprNodeCollection order_exprs = 13;
bool unbounded = 14;
map<string, string> options = 11;
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 12e834d75a..4c9cbafd8f 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -4175,7 +4175,7 @@ impl serde::Serialize for CreateExternalTableNode {
if !self.definition.is_empty() {
len += 1;
}
- if !self.file_compression_type.is_empty() {
+ if self.file_compression_type != 0 {
len += 1;
}
if !self.order_exprs.is_empty() {
@@ -4221,8 +4221,10 @@ impl serde::Serialize for CreateExternalTableNode {
if !self.definition.is_empty() {
struct_ser.serialize_field("definition", &self.definition)?;
}
- if !self.file_compression_type.is_empty() {
- struct_ser.serialize_field("fileCompressionType", &self.file_compression_type)?;
+ if self.file_compression_type != 0 {
+ let v = CompressionTypeVariant::try_from(self.file_compression_type)
+ .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.file_compression_type)))?;
+ struct_ser.serialize_field("fileCompressionType", &v)?;
}
if !self.order_exprs.is_empty() {
struct_ser.serialize_field("orderExprs", &self.order_exprs)?;
@@ -4420,7 +4422,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
if file_compression_type__.is_some() {
return Err(serde::de::Error::duplicate_field("fileCompressionType"));
}
- file_compression_type__ = Some(map_.next_value()?);
+ file_compression_type__ = Some(map_.next_value::<CompressionTypeVariant>()? as i32);
}
GeneratedField::OrderExprs => {
if order_exprs__.is_some() {
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 4ee0b70325..5db5f3cab7 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -349,8 +349,8 @@ pub struct CreateExternalTableNode {
pub delimiter: ::prost::alloc::string::String,
#[prost(string, tag = "9")]
pub definition: ::prost::alloc::string::String,
- #[prost(string, tag = "10")]
- pub file_compression_type: ::prost::alloc::string::String,
+ #[prost(enumeration = "CompressionTypeVariant", tag = "17")]
+ pub file_compression_type: i32,
#[prost(message, repeated, tag = "13")]
pub order_exprs: ::prost::alloc::vec::Vec<LogicalExprNodeCollection>,
#[prost(bool, tag = "14")]
diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs
index e8a3878448..6ca95519a9 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -538,6 +538,16 @@ impl AsLogicalPlan for LogicalPlanNode {
column_defaults.insert(col_name.clone(), expr);
}
+ let file_compression_type = protobuf::CompressionTypeVariant::try_from(
+ create_extern_table.file_compression_type,
+ )
+ .map_err(|_| {
+ proto_error(format!(
+ "Unknown file compression type {}",
+ create_extern_table.file_compression_type
+ ))
+ })?;
+
Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(CreateExternalTable {
schema: pb_schema.try_into()?,
name: from_owned_table_reference(create_extern_table.name.as_ref(), "CreateExternalTable")?,
@@ -552,7 +562,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.clone(),
order_exprs,
if_not_exists: create_extern_table.if_not_exists,
- file_compression_type: CompressionTypeVariant::from_str(&create_extern_table.file_compression_type).map_err(|_| DataFusionError::NotImplemented(format!("Unsupported file compression type {}", create_extern_table.file_compression_type)))?,
+ file_compression_type: file_compression_type.into(),
definition,
unbounded: create_extern_table.unbounded,
options: create_extern_table.options.clone(),
@@ -1410,6 +1420,9 @@ impl AsLogicalPlan for LogicalPlanNode {
converted_column_defaults.insert(col_name.clone(), expr.try_into()?);
}
+ let file_compression_type =
+ protobuf::CompressionTypeVariant::from(file_compression_type);
+
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
protobuf::CreateExternalTableNode {
@@ -1423,7 +1436,7 @@ impl AsLogicalPlan for LogicalPlanNode {
delimiter: String::from(*delimiter),
order_exprs: converted_order_exprs,
definition: definition.clone().unwrap_or_default(),
- file_compression_type: file_compression_type.to_string(),
+ file_compression_type: file_compression_type.into(),
unbounded: *unbounded,
options: options.clone(),
constraints: Some(constraints.clone().into()),
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index 23ab813ca7..193c4dfe03 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -782,6 +782,18 @@ impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
}
}
+impl From<CompressionTypeVariant> for protobuf::CompressionTypeVariant {
+ fn from(value: CompressionTypeVariant) -> Self {
+ match value {
+ CompressionTypeVariant::GZIP => Self::Gzip,
+ CompressionTypeVariant::BZIP2 => Self::Bzip2,
+ CompressionTypeVariant::XZ => Self::Xz,
+ CompressionTypeVariant::ZSTD => Self::Zstd,
+ CompressionTypeVariant::UNCOMPRESSED => Self::Uncompressed,
+ }
+ }
+}
+
impl TryFrom<&protobuf::FileTypeWriterOptions> for FileTypeWriterOptions {
type Error = DataFusionError;