You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/28 11:17:42 UTC

[arrow-rs] branch master updated: Parquet Derive: remove obscure feature flags, make chrono time emit converted type (#712)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e7c4c5  Parquet Derive: remove obscure feature flags, make chrono time emit converted type (#712)
0e7c4c5 is described below

commit 0e7c4c5f1eec0bdb6a83bdc89ba30e22a95cc158
Author: Xavier Lange <xr...@gmail.com>
AuthorDate: Sat Aug 28 07:17:39 2021 -0400

    Parquet Derive: remove obscure feature flags, make chrono time emit converted type (#712)
    
    * remove feature flags, make timestamp emit converted types
    
    * remove tracking numbers
    
    * NaiveDateTime emits converted type
    
    * formatting
    
    * formatting
---
 parquet_derive/Cargo.toml           |  5 ---
 parquet_derive/src/parquet_field.rs | 79 ++++++++++++++++++++++++++++++-------
 parquet_derive_test/Cargo.toml      |  1 +
 parquet_derive_test/src/lib.rs      |  6 ++-
 4 files changed, 70 insertions(+), 21 deletions(-)

diff --git a/parquet_derive/Cargo.toml b/parquet_derive/Cargo.toml
index c374b33..a3af8e4 100644
--- a/parquet_derive/Cargo.toml
+++ b/parquet_derive/Cargo.toml
@@ -30,11 +30,6 @@ edition = "2018"
 [lib]
 proc-macro = true
 
-[features]
-chrono = []
-bigdecimal = []
-uuid = []
-
 [dependencies]
 proc-macro2 = "1.0"
 quote = "1.0"
diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs
index 6f2fa0c..36730c7 100644
--- a/parquet_derive/src/parquet_field.rs
+++ b/parquet_derive/src/parquet_field.rs
@@ -207,14 +207,25 @@ impl Field {
         };
         let logical_type = self.ty.logical_type();
         let repetition = self.ty.repetition();
-        quote! {
-            fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
-                .with_logical_type(#logical_type)
-                .with_repetition(#repetition)
-                .build()
-                .unwrap()
-                .into()
-            );
+        let converted_type = self.ty.converted_type();
+
+        if let Some(converted_type) = converted_type {
+            quote! {
+                fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
+                    .with_logical_type(#logical_type)
+                    .with_repetition(#repetition)
+                    .with_converted_type(#converted_type)
+                    .build().unwrap().into()
+                )
+            }
+        } else {
+            quote! {
+                fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
+                    .with_logical_type(#logical_type)
+                    .with_repetition(#repetition)
+                    .build().unwrap().into()
+                )
+            }
         }
     }
 
@@ -534,6 +545,7 @@ impl Type {
                 })) }
             }
             "NaiveDate" => quote! { Some(LogicalType::DATE(Default::default())) },
+            "NaiveDateTime" => quote! { None },
             "f32" | "f64" => quote! { None },
             "String" | "str" => quote! { Some(LogicalType::STRING(Default::default())) },
             "Uuid" => quote! { Some(LogicalType::UUID(Default::default())) },
@@ -541,6 +553,15 @@ impl Type {
         }
     }
 
+    fn converted_type(&self) -> Option<proc_macro2::TokenStream> {
+        let last_part = self.last_part();
+
+        match last_part.trim() {
+            "NaiveDateTime" => Some(quote! { ConvertedType::TIMESTAMP_MILLIS }),
+            _ => None,
+        }
+    }
+
     fn repetition(&self) -> proc_macro2::TokenStream {
         match &self {
             Type::Option(_) => quote! { Repetition::OPTIONAL },
@@ -944,7 +965,6 @@ mod test {
     }
 
     #[test]
-    #[cfg(feature = "chrono")]
     fn test_chrono_timestamp_millis() {
         let snippet: proc_macro2::TokenStream = quote! {
           struct ATimestampStruct {
@@ -971,7 +991,11 @@ mod test {
             {
                 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
                 let vals : Vec<_> = records.iter().filter_map(|rec| {
-                    rec.maybe_happened.map(|inner| {  inner.timestamp_millis()  })
+                    if let Some(inner) = rec.maybe_happened {
+                        Some(inner.timestamp_millis())
+                    } else {
+                        None
+                    }
                 }).collect();
 
                 if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer {
@@ -984,7 +1008,6 @@ mod test {
     }
 
     #[test]
-    #[cfg(feature = "chrono")]
     fn test_chrono_date() {
         let snippet: proc_macro2::TokenStream = quote! {
           struct ATimestampStruct {
@@ -1011,7 +1034,11 @@ mod test {
             {
                 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
                 let vals : Vec<_> = records.iter().filter_map(|rec| {
-                    rec.maybe_happened.map(|inner| {  inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32  })
+                    if let Some(inner) = rec.maybe_happened {
+                        Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)
+                    } else {
+                        None
+                    }
                 }).collect();
 
                 if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer {
@@ -1024,10 +1051,9 @@ mod test {
     }
 
     #[test]
-    #[cfg(feature = "uuid")]
     fn test_uuid() {
         let snippet: proc_macro2::TokenStream = quote! {
-          struct ATimestampStruct {
+          struct AUuidStruct {
             unique_id: uuid::Uuid,
             maybe_unique_id: Option<&uuid::Uuid>,
           }
@@ -1051,7 +1077,11 @@ mod test {
             {
                 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect();
                 let vals : Vec<_> = records.iter().filter_map(|rec| {
-                    rec.maybe_unique_id.map(|ref inner| {  (&inner.to_string()[..]).into()  })
+                    if let Some(ref inner) = rec.maybe_unique_id {
+                        Some((&inner.to_string()[..]).into())
+                    } else {
+                        None
+                    }
                 }).collect();
 
                 if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
@@ -1062,4 +1092,23 @@ mod test {
             }
         }).to_string());
     }
+
+    #[test]
+    fn test_converted_type() {
+        let snippet: proc_macro2::TokenStream = quote! {
+          struct ATimeStruct {
+            time: chrono::NaiveDateTime,
+          }
+        };
+
+        let fields = extract_fields(snippet);
+
+        let time = Field::from(&fields[0]);
+
+        let converted_type = time.ty.converted_type();
+        assert_eq!(
+            converted_type.unwrap().to_string(),
+            quote! { ConvertedType::TIMESTAMP_MILLIS }.to_string()
+        );
+    }
 }
diff --git a/parquet_derive_test/Cargo.toml b/parquet_derive_test/Cargo.toml
index 2d68810..97d2dde 100644
--- a/parquet_derive_test/Cargo.toml
+++ b/parquet_derive_test/Cargo.toml
@@ -30,3 +30,4 @@ publish = false
 [dependencies]
 parquet = { path = "../parquet", version = "6.0.0-SNAPSHOT" }
 parquet_derive = { path = "../parquet_derive", version = "6.0.0-SNAPSHOT" }
+chrono = "0.4.19"
\ No newline at end of file
diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs
index bc8e914..2b7c060 100644
--- a/parquet_derive_test/src/lib.rs
+++ b/parquet_derive_test/src/lib.rs
@@ -46,6 +46,7 @@ struct ACompleteRecord<'a> {
     pub maybe_double: Option<f64>,
     pub borrowed_maybe_a_string: &'a Option<String>,
     pub borrowed_maybe_a_str: &'a Option<&'a str>,
+    pub now: chrono::NaiveDateTime,
 }
 
 #[cfg(test)]
@@ -88,8 +89,11 @@ mod tests {
             OPTIONAL DOUBLE          maybe_double;
             OPTIONAL BINARY          borrowed_maybe_a_string (STRING);
             OPTIONAL BINARY          borrowed_maybe_a_str (STRING);
+            REQUIRED INT64           now (TIMESTAMP_MILLIS);
         }";
 
+        let schema = Arc::new(parse_message_type(schema_str).unwrap());
+
         let a_str = "hello mother".to_owned();
         let a_borrowed_string = "cool news".to_owned();
         let maybe_a_string = Some("it's true, I'm a string".to_owned());
@@ -116,9 +120,9 @@ mod tests {
             maybe_double: Some(std::f64::MAX),
             borrowed_maybe_a_string: &maybe_a_string,
             borrowed_maybe_a_str: &maybe_a_str,
+            now: chrono::Utc::now().naive_local(),
         }];
 
-        let schema = Arc::new(parse_message_type(schema_str).unwrap());
         let generated_schema = drs.as_slice().schema().unwrap();
 
         assert_eq!(&schema, &generated_schema);