You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/09/09 19:48:47 UTC

[arrow-rs] branch cherry_pick_0e7c4c5f created (now 8e51497)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a change to branch cherry_pick_0e7c4c5f
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git.


      at 8e51497  Parquet Derive: make chrono time emit converted type (#712)

This branch includes the following new commits:

     new 8e51497  Parquet Derive: make chrono time emit converted type (#712)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[arrow-rs] 01/01: Parquet Derive: make chrono time emit converted type (#712)

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch cherry_pick_0e7c4c5f
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git

commit 8e51497e7f7a0638dbe7cca15846459f6634275b
Author: Xavier Lange <xr...@gmail.com>
AuthorDate: Sat Aug 28 07:17:39 2021 -0400

    Parquet Derive: make chrono time emit converted type (#712)
    
    * NaiveDateTime emits converted type
---
 parquet_derive/src/parquet_field.rs | 79 ++++++++++++++++++++++++++++++-------
 parquet_derive_test/src/lib.rs      |  6 ++-
 2 files changed, 69 insertions(+), 16 deletions(-)

diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs
index 6f2fa0c..36730c7 100644
--- a/parquet_derive/src/parquet_field.rs
+++ b/parquet_derive/src/parquet_field.rs
@@ -207,14 +207,25 @@ impl Field {
         };
         let logical_type = self.ty.logical_type();
         let repetition = self.ty.repetition();
-        quote! {
-            fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
-                .with_logical_type(#logical_type)
-                .with_repetition(#repetition)
-                .build()
-                .unwrap()
-                .into()
-            );
+        let converted_type = self.ty.converted_type();
+
+        if let Some(converted_type) = converted_type {
+            quote! {
+                fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
+                    .with_logical_type(#logical_type)
+                    .with_repetition(#repetition)
+                    .with_converted_type(#converted_type)
+                    .build().unwrap().into()
+                )
+            }
+        } else {
+            quote! {
+                fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
+                    .with_logical_type(#logical_type)
+                    .with_repetition(#repetition)
+                    .build().unwrap().into()
+                )
+            }
         }
     }
 
@@ -534,6 +545,7 @@ impl Type {
                 })) }
             }
             "NaiveDate" => quote! { Some(LogicalType::DATE(Default::default())) },
+            "NaiveDateTime" => quote! { None },
             "f32" | "f64" => quote! { None },
             "String" | "str" => quote! { Some(LogicalType::STRING(Default::default())) },
             "Uuid" => quote! { Some(LogicalType::UUID(Default::default())) },
@@ -541,6 +553,15 @@ impl Type {
         }
     }
 
+    fn converted_type(&self) -> Option<proc_macro2::TokenStream> {
+        let last_part = self.last_part();
+
+        match last_part.trim() {
+            "NaiveDateTime" => Some(quote! { ConvertedType::TIMESTAMP_MILLIS }),
+            _ => None,
+        }
+    }
+
     fn repetition(&self) -> proc_macro2::TokenStream {
         match &self {
             Type::Option(_) => quote! { Repetition::OPTIONAL },
@@ -944,7 +965,6 @@ mod test {
     }
 
     #[test]
-    #[cfg(feature = "chrono")]
     fn test_chrono_timestamp_millis() {
         let snippet: proc_macro2::TokenStream = quote! {
           struct ATimestampStruct {
@@ -971,7 +991,11 @@ mod test {
             {
                 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
                 let vals : Vec<_> = records.iter().filter_map(|rec| {
-                    rec.maybe_happened.map(|inner| {  inner.timestamp_millis()  })
+                    if let Some(inner) = rec.maybe_happened {
+                        Some(inner.timestamp_millis())
+                    } else {
+                        None
+                    }
                 }).collect();
 
                 if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer {
@@ -984,7 +1008,6 @@ mod test {
     }
 
     #[test]
-    #[cfg(feature = "chrono")]
     fn test_chrono_date() {
         let snippet: proc_macro2::TokenStream = quote! {
           struct ATimestampStruct {
@@ -1011,7 +1034,11 @@ mod test {
             {
                 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
                 let vals : Vec<_> = records.iter().filter_map(|rec| {
-                    rec.maybe_happened.map(|inner| {  inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32  })
+                    if let Some(inner) = rec.maybe_happened {
+                        Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)
+                    } else {
+                        None
+                    }
                 }).collect();
 
                 if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer {
@@ -1024,10 +1051,9 @@ mod test {
     }
 
     #[test]
-    #[cfg(feature = "uuid")]
     fn test_uuid() {
         let snippet: proc_macro2::TokenStream = quote! {
-          struct ATimestampStruct {
+          struct AUuidStruct {
             unique_id: uuid::Uuid,
             maybe_unique_id: Option<&uuid::Uuid>,
           }
@@ -1051,7 +1077,11 @@ mod test {
             {
                 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect();
                 let vals : Vec<_> = records.iter().filter_map(|rec| {
-                    rec.maybe_unique_id.map(|ref inner| {  (&inner.to_string()[..]).into()  })
+                    if let Some(ref inner) = rec.maybe_unique_id {
+                        Some((&inner.to_string()[..]).into())
+                    } else {
+                        None
+                    }
                 }).collect();
 
                 if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer {
@@ -1062,4 +1092,23 @@ mod test {
             }
         }).to_string());
     }
+
+    #[test]
+    fn test_converted_type() {
+        let snippet: proc_macro2::TokenStream = quote! {
+          struct ATimeStruct {
+            time: chrono::NaiveDateTime,
+          }
+        };
+
+        let fields = extract_fields(snippet);
+
+        let time = Field::from(&fields[0]);
+
+        let converted_type = time.ty.converted_type();
+        assert_eq!(
+            converted_type.unwrap().to_string(),
+            quote! { ConvertedType::TIMESTAMP_MILLIS }.to_string()
+        );
+    }
 }
diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs
index bc8e914..2b7c060 100644
--- a/parquet_derive_test/src/lib.rs
+++ b/parquet_derive_test/src/lib.rs
@@ -46,6 +46,7 @@ struct ACompleteRecord<'a> {
     pub maybe_double: Option<f64>,
     pub borrowed_maybe_a_string: &'a Option<String>,
     pub borrowed_maybe_a_str: &'a Option<&'a str>,
+    pub now: chrono::NaiveDateTime,
 }
 
 #[cfg(test)]
@@ -88,8 +89,11 @@ mod tests {
             OPTIONAL DOUBLE          maybe_double;
             OPTIONAL BINARY          borrowed_maybe_a_string (STRING);
             OPTIONAL BINARY          borrowed_maybe_a_str (STRING);
+            REQUIRED INT64           now (TIMESTAMP_MILLIS);
         }";
 
+        let schema = Arc::new(parse_message_type(schema_str).unwrap());
+
         let a_str = "hello mother".to_owned();
         let a_borrowed_string = "cool news".to_owned();
         let maybe_a_string = Some("it's true, I'm a string".to_owned());
@@ -116,9 +120,9 @@ mod tests {
             maybe_double: Some(std::f64::MAX),
             borrowed_maybe_a_string: &maybe_a_string,
             borrowed_maybe_a_str: &maybe_a_str,
+            now: chrono::Utc::now().naive_local(),
         }];
 
-        let schema = Arc::new(parse_message_type(schema_str).unwrap());
         let generated_schema = drs.as_slice().schema().unwrap();
 
         assert_eq!(&schema, &generated_schema);