You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/16 11:11:24 UTC

[arrow-rs] branch master updated: Fix csv writing of timestamps to show timezone. (#849)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new bfc95be  Fix csv writing of timestamps to show timezone. (#849)
bfc95be is described below

commit bfc95be66545b17c7c13ec50ccdf4588d38e4698
Author: Navin <na...@novemberkilo.com>
AuthorDate: Tue Nov 16 22:11:19 2021 +1100

    Fix csv writing of timestamps to show timezone. (#849)
    
    * Write timestamps (in csvs) with timezone.
    
    * More tests and more verbose naming.
    
    * Please linter.
    
    * Please clippy.
    
    * Cleanup based on review feedback.
---
 arrow/src/compute/kernels/temporal.rs |  98 +++++++++++
 arrow/src/csv/writer.rs               | 305 +++++++++++++++++++++++++---------
 2 files changed, 320 insertions(+), 83 deletions(-)

diff --git a/arrow/src/compute/kernels/temporal.rs b/arrow/src/compute/kernels/temporal.rs
index 24559b0..269f5cb 100644
--- a/arrow/src/compute/kernels/temporal.rs
+++ b/arrow/src/compute/kernels/temporal.rs
@@ -95,6 +95,28 @@ pub fn using_chrono_tz(tz: &str) -> Option<FixedOffset> {
         .ok()
 }
 
+#[cfg(not(feature = "chrono-tz"))]
+pub fn using_chrono_tz_and_utc_naive_date_time(
+    _tz: &str,
+    _utc: chrono::NaiveDateTime,
+) -> Option<FixedOffset> {
+    Some(FixedOffset::east(0))
+}
+/// Parse the given string into a string representing fixed-offset that is correct as of the given
+/// UTC NaiveDateTime.
+/// Note that the offset is function of time and can vary depending on whether daylight savings is
+/// in effect or not. e.g. Australia/Sydney is +10:00 or +11:00 depending on DST.
+#[cfg(feature = "chrono-tz")]
+pub fn using_chrono_tz_and_utc_naive_date_time(
+    tz: &str,
+    utc: chrono::NaiveDateTime,
+) -> Option<FixedOffset> {
+    use chrono::{Offset, TimeZone};
+    tz.parse::<chrono_tz::Tz>()
+        .map(|tz| tz.offset_from_utc_datetime(&utc).fix())
+        .ok()
+}
+
 /// Extracts the hours of a given temporal array as an array of integers
 pub fn hour<T>(array: &PrimitiveArray<T>) -> Result<Int32Array>
 where
@@ -202,6 +224,8 @@ where
 #[cfg(test)]
 mod tests {
     use super::*;
+    #[cfg(feature = "chrono-tz")]
+    use chrono::NaiveDate;
 
     #[test]
     fn test_temporal_array_date64_hour() {
@@ -435,4 +459,78 @@ mod tests {
         ));
         assert!(matches!(hour(&a), Err(ArrowError::ComputeError(_))))
     }
+
+    #[cfg(feature = "chrono-tz")]
+    #[test]
+    fn test_using_chrono_tz() {
+        let sydney_offset = FixedOffset::east(10 * 60 * 60);
+        assert_eq!(
+            using_chrono_tz(&"Australia/Sydney".to_string()),
+            Some(sydney_offset)
+        );
+
+        let nyc_offset = FixedOffset::west(5 * 60 * 60);
+        assert_eq!(
+            using_chrono_tz(&"America/New_York".to_string()),
+            Some(nyc_offset)
+        );
+    }
+
+    #[cfg(feature = "chrono-tz")]
+    #[test]
+    fn test_using_chrono_tz_and_utc_naive_date_time() {
+        let sydney_tz = "Australia/Sydney".to_string();
+        let sydney_offset_without_dst = FixedOffset::east(10 * 60 * 60);
+        let sydney_offset_with_dst = FixedOffset::east(11 * 60 * 60);
+        // Daylight savings ends
+        // When local daylight time was about to reach
+        // Sunday, 4 April 2021, 3:00:00 am clocks were turned backward 1 hour to
+        // Sunday, 4 April 2021, 2:00:00 am local standard time instead.
+
+        // Daylight savings starts
+        // When local standard time was about to reach
+        // Sunday, 3 October 2021, 2:00:00 am clocks were turned forward 1 hour to
+        // Sunday, 3 October 2021, 3:00:00 am local daylight time instead.
+
+        // Sydney 2021-04-04T02:30:00+11:00 is 2021-04-03T15:30:00Z
+        let utc_just_before_sydney_dst_ends =
+            NaiveDate::from_ymd(2021, 4, 3).and_hms_nano(15, 30, 0, 0);
+        assert_eq!(
+            using_chrono_tz_and_utc_naive_date_time(
+                &sydney_tz,
+                utc_just_before_sydney_dst_ends
+            ),
+            Some(sydney_offset_with_dst)
+        );
+        // Sydney 2021-04-04T02:30:00+10:00 is 2021-04-03T16:30:00Z
+        let utc_just_after_sydney_dst_ends =
+            NaiveDate::from_ymd(2021, 4, 3).and_hms_nano(16, 30, 0, 0);
+        assert_eq!(
+            using_chrono_tz_and_utc_naive_date_time(
+                &sydney_tz,
+                utc_just_after_sydney_dst_ends
+            ),
+            Some(sydney_offset_without_dst)
+        );
+        // Sydney 2021-10-03T01:30:00+10:00 is 2021-10-02T15:30:00Z
+        let utc_just_before_sydney_dst_starts =
+            NaiveDate::from_ymd(2021, 10, 2).and_hms_nano(15, 30, 0, 0);
+        assert_eq!(
+            using_chrono_tz_and_utc_naive_date_time(
+                &sydney_tz,
+                utc_just_before_sydney_dst_starts
+            ),
+            Some(sydney_offset_without_dst)
+        );
+        // Sydney 2021-04-04T03:30:00+11:00 is 2021-10-02T16:30:00Z
+        let utc_just_after_sydney_dst_starts =
+            NaiveDate::from_ymd(2022, 10, 2).and_hms_nano(16, 30, 0, 0);
+        assert_eq!(
+            using_chrono_tz_and_utc_naive_date_time(
+                &sydney_tz,
+                utc_just_after_sydney_dst_starts
+            ),
+            Some(sydney_offset_with_dst)
+        );
+    }
 }
diff --git a/arrow/src/csv/writer.rs b/arrow/src/csv/writer.rs
index 50fd9ef..44959e6 100644
--- a/arrow/src/csv/writer.rs
+++ b/arrow/src/csv/writer.rs
@@ -67,6 +67,11 @@
 
 use std::io::Write;
 
+#[cfg(feature = "chrono-tz")]
+use crate::compute::kernels::temporal::using_chrono_tz_and_utc_naive_date_time;
+#[cfg(feature = "chrono-tz")]
+use chrono::{DateTime, Utc};
+
 use crate::datatypes::*;
 use crate::error::{ArrowError, Result};
 use crate::record_batch::RecordBatch;
@@ -75,6 +80,7 @@ use crate::{array::*, util::serialization::lexical_to_string};
 const DEFAULT_DATE_FORMAT: &str = "%F";
 const DEFAULT_TIME_FORMAT: &str = "%T";
 const DEFAULT_TIMESTAMP_FORMAT: &str = "%FT%H:%M:%S.%9f";
+const DEFAULT_TIMESTAMP_TZ_FORMAT: &str = "%FT%H:%M:%S.%9f%:z";
 
 fn write_primitive_value<T>(array: &ArrayRef, i: usize) -> String
 where
@@ -100,6 +106,8 @@ pub struct Writer<W: Write> {
     datetime_format: String,
     /// The timestamp format for timestamp arrays
     timestamp_format: String,
+    /// The timestamp format for timestamp (with timezone) arrays
+    timestamp_tz_format: String,
     /// The time format for time arrays
     time_format: String,
     /// Is the beginning-of-writer
@@ -120,6 +128,7 @@ impl<W: Write> Writer<W> {
             datetime_format: DEFAULT_TIMESTAMP_FORMAT.to_string(),
             time_format: DEFAULT_TIME_FORMAT.to_string(),
             timestamp_format: DEFAULT_TIMESTAMP_FORMAT.to_string(),
+            timestamp_tz_format: DEFAULT_TIMESTAMP_TZ_FORMAT.to_string(),
             beginning: true,
         }
     }
@@ -213,35 +222,8 @@ impl<W: Write> Writer<W> {
                         .format(&self.time_format)
                         .to_string()
                 }
-                DataType::Timestamp(time_unit, _) => {
-                    use TimeUnit::*;
-                    let datetime = match time_unit {
-                        Second => col
-                            .as_any()
-                            .downcast_ref::<TimestampSecondArray>()
-                            .unwrap()
-                            .value_as_datetime(row_index)
-                            .unwrap(),
-                        Millisecond => col
-                            .as_any()
-                            .downcast_ref::<TimestampMillisecondArray>()
-                            .unwrap()
-                            .value_as_datetime(row_index)
-                            .unwrap(),
-                        Microsecond => col
-                            .as_any()
-                            .downcast_ref::<TimestampMicrosecondArray>()
-                            .unwrap()
-                            .value_as_datetime(row_index)
-                            .unwrap(),
-                        Nanosecond => col
-                            .as_any()
-                            .downcast_ref::<TimestampNanosecondArray>()
-                            .unwrap()
-                            .value_as_datetime(row_index)
-                            .unwrap(),
-                    };
-                    format!("{}", datetime.format(&self.timestamp_format))
+                DataType::Timestamp(time_unit, time_zone) => {
+                    self.handle_timestamp(time_unit, time_zone, row_index, col)?
                 }
                 DataType::Decimal(..) => make_string_from_decimal(col, row_index)?,
                 t => {
@@ -258,6 +240,102 @@ impl<W: Write> Writer<W> {
         Ok(())
     }
 
+    #[cfg(not(feature = "chrono-tz"))]
+    fn handle_timestamp(
+        &self,
+        time_unit: &TimeUnit,
+        _time_zone: &Option<String>,
+        row_index: usize,
+        col: &ArrayRef,
+    ) -> Result<String> {
+        use TimeUnit::*;
+        let datetime = match time_unit {
+            Second => col
+                .as_any()
+                .downcast_ref::<TimestampSecondArray>()
+                .unwrap()
+                .value_as_datetime(row_index)
+                .unwrap(),
+            Millisecond => col
+                .as_any()
+                .downcast_ref::<TimestampMillisecondArray>()
+                .unwrap()
+                .value_as_datetime(row_index)
+                .unwrap(),
+            Microsecond => col
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap()
+                .value_as_datetime(row_index)
+                .unwrap(),
+            Nanosecond => col
+                .as_any()
+                .downcast_ref::<TimestampNanosecondArray>()
+                .unwrap()
+                .value_as_datetime(row_index)
+                .unwrap(),
+        };
+        Ok(format!("{}", datetime.format(&self.timestamp_format)))
+    }
+
+    #[cfg(feature = "chrono-tz")]
+    fn handle_timestamp(
+        &self,
+        time_unit: &TimeUnit,
+        time_zone: &Option<String>,
+        row_index: usize,
+        col: &ArrayRef,
+    ) -> Result<String> {
+        use TimeUnit::*;
+
+        let datetime = match time_unit {
+            Second => col
+                .as_any()
+                .downcast_ref::<TimestampSecondArray>()
+                .unwrap()
+                .value_as_datetime(row_index)
+                .unwrap(),
+            Millisecond => col
+                .as_any()
+                .downcast_ref::<TimestampMillisecondArray>()
+                .unwrap()
+                .value_as_datetime(row_index)
+                .unwrap(),
+            Microsecond => col
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap()
+                .value_as_datetime(row_index)
+                .unwrap(),
+            Nanosecond => col
+                .as_any()
+                .downcast_ref::<TimestampNanosecondArray>()
+                .unwrap()
+                .value_as_datetime(row_index)
+                .unwrap(),
+        };
+        let tzs = match time_zone {
+            None => "UTC".to_string(),
+            Some(tzs) => tzs.to_string(),
+        };
+
+        match using_chrono_tz_and_utc_naive_date_time(&tzs, datetime) {
+            Some(tz) => {
+                let utc_time = DateTime::<Utc>::from_utc(datetime, Utc);
+                Ok(format!(
+                    "{}",
+                    utc_time
+                        .with_timezone(&tz)
+                        .format(&self.timestamp_tz_format)
+                ))
+            }
+            err => Err(ArrowError::ComputeError(format!(
+                "{}: {:?}",
+                "Unable to parse timezone", err
+            ))),
+        }
+    }
+
     /// Write a vector of record batches to a writable object
     pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
         let num_columns = batch.num_columns();
@@ -311,6 +389,8 @@ pub struct WriterBuilder {
     datetime_format: Option<String>,
     /// Optional timestamp format for timestamp arrays
     timestamp_format: Option<String>,
+    /// Optional timestamp format for timestamp with timezone arrays
+    timestamp_tz_format: Option<String>,
     /// Optional time format for time arrays
     time_format: Option<String>,
 }
@@ -324,6 +404,7 @@ impl Default for WriterBuilder {
             datetime_format: Some(DEFAULT_TIMESTAMP_FORMAT.to_string()),
             time_format: Some(DEFAULT_TIME_FORMAT.to_string()),
             timestamp_format: Some(DEFAULT_TIMESTAMP_FORMAT.to_string()),
+            timestamp_tz_format: Some(DEFAULT_TIMESTAMP_TZ_FORMAT.to_string()),
         }
     }
 }
@@ -406,6 +487,9 @@ impl WriterBuilder {
             timestamp_format: self
                 .timestamp_format
                 .unwrap_or_else(|| DEFAULT_TIMESTAMP_FORMAT.to_string()),
+            timestamp_tz_format: self
+                .timestamp_tz_format
+                .unwrap_or_else(|| DEFAULT_TIMESTAMP_TZ_FORMAT.to_string()),
             beginning: true,
         }
     }
@@ -417,6 +501,7 @@ mod tests {
 
     use crate::csv::Reader;
     use crate::datatypes::{Field, Schema};
+    #[cfg(feature = "chrono-tz")]
     use crate::util::string_writer::StringWriter;
     use crate::util::test_util::get_temp_file;
     use std::fs::File;
@@ -485,7 +570,16 @@ mod tests {
         let mut buffer: Vec<u8> = vec![];
         file.read_to_end(&mut buffer).unwrap();
 
-        assert_eq!(
+        let expected = if cfg!(feature = "chrono-tz") {
+            r#"c1,c2,c3,c4,c5,c6,c7
+Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
+consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000+00:00,06:51:20,cupcakes
+sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000+00:00,23:46:03,foo
+Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
+consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000+00:00,06:51:20,cupcakes
+sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000+00:00,23:46:03,foo
+"#
+        } else {
             r#"c1,c2,c3,c4,c5,c6,c7
 Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
 consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000,06:51:20,cupcakes
@@ -494,9 +588,8 @@ Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
 consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000,06:51:20,cupcakes
 sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
 "#
-            .to_string(),
-            String::from_utf8(buffer).unwrap()
-        );
+        };
+        assert_eq!(expected.to_string(), String::from_utf8(buffer).unwrap());
     }
 
     #[test]
@@ -559,73 +652,53 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
         );
     }
 
+    #[cfg(feature = "chrono-tz")]
     #[test]
-    fn test_export_csv_string() {
+    fn test_export_csv_timestamps() {
         let schema = Schema::new(vec![
-            Field::new("c1", DataType::Utf8, false),
-            Field::new("c2", DataType::Float64, true),
-            Field::new("c3", DataType::UInt32, false),
-            Field::new("c4", DataType::Boolean, true),
-            Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
-            Field::new("c6", DataType::Time32(TimeUnit::Second), false),
-            Field::new("c7", DataType::Decimal(6, 2), false),
+            Field::new(
+                "c1",
+                DataType::Timestamp(
+                    TimeUnit::Millisecond,
+                    Some("Australia/Sydney".to_string()),
+                ),
+                true,
+            ),
+            Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true),
         ]);
 
-        let c1 = StringArray::from(vec![
-            "Lorem ipsum dolor sit amet",
-            "consectetur adipiscing elit",
-            "sed do eiusmod tempor",
-        ]);
-        let c2 = PrimitiveArray::<Float64Type>::from(vec![
-            Some(123.564532),
-            None,
-            Some(-556132.25),
-        ]);
-        let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
-        let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
-        let c5 = TimestampMillisecondArray::from_opt_vec(
-            vec![None, Some(1555584887378), Some(1555555555555)],
+        let c1 = TimestampMillisecondArray::from_opt_vec(
+            // 1555584887 converts to 2019-04-18, 20:54:47 in time zone Australia/Sydney (AEST).
+            // The offset (difference to UTC) is +10:00.
+            // 1635577147 converts to 2021-10-30 17:59:07 in time zone Australia/Sydney (AEDT)
+            // The offset (difference to UTC) is +11:00. Note that daylight savings is in effect on 2021-10-30.
+            //
+            vec![Some(1555584887378), Some(1635577147000)],
+            Some("Australia/Sydney".to_string()),
+        );
+        let c2 = TimestampMillisecondArray::from_opt_vec(
+            vec![Some(1555584887378), Some(1635577147000)],
             None,
         );
-        let c6 = Time32SecondArray::from(vec![1234, 24680, 85563]);
-        let mut c7_builder = DecimalBuilder::new(5, 6, 2);
-        c7_builder.append_value(12345_i128).unwrap();
-        c7_builder.append_value(-12345_i128).unwrap();
-        c7_builder.append_null().unwrap();
-        let c7 = c7_builder.finish();
-
-        let batch = RecordBatch::try_new(
-            Arc::new(schema),
-            vec![
-                Arc::new(c1),
-                Arc::new(c2),
-                Arc::new(c3),
-                Arc::new(c4),
-                Arc::new(c5),
-                Arc::new(c6),
-                Arc::new(c7),
-            ],
-        )
-        .unwrap();
+        let batch =
+            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)])
+                .unwrap();
 
         let sw = StringWriter::new();
         let mut writer = Writer::new(sw);
-        let batches = vec![&batch, &batch];
+        let batches = vec![&batch];
         for batch in batches {
             writer.write(batch).unwrap();
         }
 
-        let left = "c1,c2,c3,c4,c5,c6,c7
-Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,123.45
-consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000,06:51:20,-123.45
-sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,
-Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,123.45
-consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000,06:51:20,-123.45
-sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,\n";
+        let left = "c1,c2
+2019-04-18T20:54:47.378000000+10:00,2019-04-18T10:54:47.378000000+00:00
+2021-10-30T17:59:07.000000000+11:00,2021-10-30T06:59:07.000000000+00:00\n";
         let right = writer.writer.into_inner().map(|s| s.to_string());
         assert_eq!(Some(left.to_string()), right.ok());
     }
 
+    #[cfg(not(feature = "chrono-tz"))]
     #[test]
     fn test_conversion_consistency() {
         // test if we can serialize and deserialize whilst retaining the same type information/ precision
@@ -675,4 +748,70 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,\n";
         let expected = vec![Some(3), Some(2), Some(1)];
         assert_eq!(actual, expected);
     }
+
+    #[cfg(feature = "chrono-tz")]
+    #[test]
+    fn test_conversion_consistency() {
+        // test if we can serialize and deserialize whilst retaining the same type information/ precision
+
+        let schema = Schema::new(vec![
+            Field::new("c1", DataType::Date32, false),
+            Field::new("c2", DataType::Date64, false),
+            Field::new("c3", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
+        ]);
+
+        let nanoseconds = vec![
+            1599566300000000000,
+            1599566200000000000,
+            1599566100000000000,
+        ];
+        let c1 = Date32Array::from(vec![3, 2, 1]);
+        let c2 = Date64Array::from(vec![3, 2, 1]);
+        let c3 = TimestampNanosecondArray::from_vec(nanoseconds.clone(), None);
+
+        let batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
+        )
+        .unwrap();
+
+        let builder = WriterBuilder::new().has_headers(false);
+
+        let mut buf: Cursor<Vec<u8>> = Default::default();
+        // drop the writer early to release the borrow.
+        {
+            let mut writer = builder.build(&mut buf);
+            writer.write(&batch).unwrap();
+        }
+        buf.set_position(0);
+
+        let mut reader = Reader::new(
+            buf,
+            Arc::new(schema),
+            false,
+            None,
+            3,
+            // starting at row 2 and up to row 6.
+            None,
+            None,
+        );
+        let rb = reader.next().unwrap().unwrap();
+        let c1 = rb.column(0).as_any().downcast_ref::<Date32Array>().unwrap();
+        let c2 = rb.column(1).as_any().downcast_ref::<Date64Array>().unwrap();
+        let c3 = rb
+            .column(2)
+            .as_any()
+            .downcast_ref::<TimestampNanosecondArray>()
+            .unwrap();
+
+        let actual = c1.into_iter().collect::<Vec<_>>();
+        let expected = vec![Some(3), Some(2), Some(1)];
+        assert_eq!(actual, expected);
+        let actual = c2.into_iter().collect::<Vec<_>>();
+        let expected = vec![Some(3), Some(2), Some(1)];
+        assert_eq!(actual, expected);
+        let actual = c3.into_iter().collect::<Vec<_>>();
+        let expected = nanoseconds.into_iter().map(|x| Some(x)).collect::<Vec<_>>();
+        assert_eq!(actual, expected);
+    }
 }