You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/16 11:11:24 UTC
[arrow-rs] branch master updated: Fix csv writing of timestamps to show timezone. (#849)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new bfc95be Fix csv writing of timestamps to show timezone. (#849)
bfc95be is described below
commit bfc95be66545b17c7c13ec50ccdf4588d38e4698
Author: Navin <na...@novemberkilo.com>
AuthorDate: Tue Nov 16 22:11:19 2021 +1100
Fix csv writing of timestamps to show timezone. (#849)
* Write timestamps (in csvs) with timezone.
* More tests and more verbose naming.
* Please linter.
* Please clippy.
* Cleanup based on review feedback.
---
arrow/src/compute/kernels/temporal.rs | 98 +++++++++++
arrow/src/csv/writer.rs | 305 +++++++++++++++++++++++++---------
2 files changed, 320 insertions(+), 83 deletions(-)
diff --git a/arrow/src/compute/kernels/temporal.rs b/arrow/src/compute/kernels/temporal.rs
index 24559b0..269f5cb 100644
--- a/arrow/src/compute/kernels/temporal.rs
+++ b/arrow/src/compute/kernels/temporal.rs
@@ -95,6 +95,28 @@ pub fn using_chrono_tz(tz: &str) -> Option<FixedOffset> {
.ok()
}
+#[cfg(not(feature = "chrono-tz"))]
+pub fn using_chrono_tz_and_utc_naive_date_time(
+ _tz: &str,
+ _utc: chrono::NaiveDateTime,
+) -> Option<FixedOffset> {
+ Some(FixedOffset::east(0))
+}
+/// Parse the given string into a string representing fixed-offset that is correct as of the given
+/// UTC NaiveDateTime.
+/// Note that the offset is function of time and can vary depending on whether daylight savings is
+/// in effect or not. e.g. Australia/Sydney is +10:00 or +11:00 depending on DST.
+#[cfg(feature = "chrono-tz")]
+pub fn using_chrono_tz_and_utc_naive_date_time(
+ tz: &str,
+ utc: chrono::NaiveDateTime,
+) -> Option<FixedOffset> {
+ use chrono::{Offset, TimeZone};
+ tz.parse::<chrono_tz::Tz>()
+ .map(|tz| tz.offset_from_utc_datetime(&utc).fix())
+ .ok()
+}
+
/// Extracts the hours of a given temporal array as an array of integers
pub fn hour<T>(array: &PrimitiveArray<T>) -> Result<Int32Array>
where
@@ -202,6 +224,8 @@ where
#[cfg(test)]
mod tests {
use super::*;
+ #[cfg(feature = "chrono-tz")]
+ use chrono::NaiveDate;
#[test]
fn test_temporal_array_date64_hour() {
@@ -435,4 +459,78 @@ mod tests {
));
assert!(matches!(hour(&a), Err(ArrowError::ComputeError(_))))
}
+
+ #[cfg(feature = "chrono-tz")]
+ #[test]
+ fn test_using_chrono_tz() {
+ let sydney_offset = FixedOffset::east(10 * 60 * 60);
+ assert_eq!(
+ using_chrono_tz(&"Australia/Sydney".to_string()),
+ Some(sydney_offset)
+ );
+
+ let nyc_offset = FixedOffset::west(5 * 60 * 60);
+ assert_eq!(
+ using_chrono_tz(&"America/New_York".to_string()),
+ Some(nyc_offset)
+ );
+ }
+
+ #[cfg(feature = "chrono-tz")]
+ #[test]
+ fn test_using_chrono_tz_and_utc_naive_date_time() {
+ let sydney_tz = "Australia/Sydney".to_string();
+ let sydney_offset_without_dst = FixedOffset::east(10 * 60 * 60);
+ let sydney_offset_with_dst = FixedOffset::east(11 * 60 * 60);
+ // Daylight savings ends
+ // When local daylight time was about to reach
+ // Sunday, 4 April 2021, 3:00:00 am clocks were turned backward 1 hour to
+ // Sunday, 4 April 2021, 2:00:00 am local standard time instead.
+
+ // Daylight savings starts
+ // When local standard time was about to reach
+ // Sunday, 3 October 2021, 2:00:00 am clocks were turned forward 1 hour to
+ // Sunday, 3 October 2021, 3:00:00 am local daylight time instead.
+
+ // Sydney 2021-04-04T02:30:00+11:00 is 2021-04-03T15:30:00Z
+ let utc_just_before_sydney_dst_ends =
+ NaiveDate::from_ymd(2021, 4, 3).and_hms_nano(15, 30, 0, 0);
+ assert_eq!(
+ using_chrono_tz_and_utc_naive_date_time(
+ &sydney_tz,
+ utc_just_before_sydney_dst_ends
+ ),
+ Some(sydney_offset_with_dst)
+ );
+ // Sydney 2021-04-04T02:30:00+10:00 is 2021-04-03T16:30:00Z
+ let utc_just_after_sydney_dst_ends =
+ NaiveDate::from_ymd(2021, 4, 3).and_hms_nano(16, 30, 0, 0);
+ assert_eq!(
+ using_chrono_tz_and_utc_naive_date_time(
+ &sydney_tz,
+ utc_just_after_sydney_dst_ends
+ ),
+ Some(sydney_offset_without_dst)
+ );
+ // Sydney 2021-10-03T01:30:00+10:00 is 2021-10-02T15:30:00Z
+ let utc_just_before_sydney_dst_starts =
+ NaiveDate::from_ymd(2021, 10, 2).and_hms_nano(15, 30, 0, 0);
+ assert_eq!(
+ using_chrono_tz_and_utc_naive_date_time(
+ &sydney_tz,
+ utc_just_before_sydney_dst_starts
+ ),
+ Some(sydney_offset_without_dst)
+ );
+ // Sydney 2021-04-04T03:30:00+11:00 is 2021-10-02T16:30:00Z
+ let utc_just_after_sydney_dst_starts =
+ NaiveDate::from_ymd(2022, 10, 2).and_hms_nano(16, 30, 0, 0);
+ assert_eq!(
+ using_chrono_tz_and_utc_naive_date_time(
+ &sydney_tz,
+ utc_just_after_sydney_dst_starts
+ ),
+ Some(sydney_offset_with_dst)
+ );
+ }
}
diff --git a/arrow/src/csv/writer.rs b/arrow/src/csv/writer.rs
index 50fd9ef..44959e6 100644
--- a/arrow/src/csv/writer.rs
+++ b/arrow/src/csv/writer.rs
@@ -67,6 +67,11 @@
use std::io::Write;
+#[cfg(feature = "chrono-tz")]
+use crate::compute::kernels::temporal::using_chrono_tz_and_utc_naive_date_time;
+#[cfg(feature = "chrono-tz")]
+use chrono::{DateTime, Utc};
+
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
@@ -75,6 +80,7 @@ use crate::{array::*, util::serialization::lexical_to_string};
const DEFAULT_DATE_FORMAT: &str = "%F";
const DEFAULT_TIME_FORMAT: &str = "%T";
const DEFAULT_TIMESTAMP_FORMAT: &str = "%FT%H:%M:%S.%9f";
+const DEFAULT_TIMESTAMP_TZ_FORMAT: &str = "%FT%H:%M:%S.%9f%:z";
fn write_primitive_value<T>(array: &ArrayRef, i: usize) -> String
where
@@ -100,6 +106,8 @@ pub struct Writer<W: Write> {
datetime_format: String,
/// The timestamp format for timestamp arrays
timestamp_format: String,
+ /// The timestamp format for timestamp (with timezone) arrays
+ timestamp_tz_format: String,
/// The time format for time arrays
time_format: String,
/// Is the beginning-of-writer
@@ -120,6 +128,7 @@ impl<W: Write> Writer<W> {
datetime_format: DEFAULT_TIMESTAMP_FORMAT.to_string(),
time_format: DEFAULT_TIME_FORMAT.to_string(),
timestamp_format: DEFAULT_TIMESTAMP_FORMAT.to_string(),
+ timestamp_tz_format: DEFAULT_TIMESTAMP_TZ_FORMAT.to_string(),
beginning: true,
}
}
@@ -213,35 +222,8 @@ impl<W: Write> Writer<W> {
.format(&self.time_format)
.to_string()
}
- DataType::Timestamp(time_unit, _) => {
- use TimeUnit::*;
- let datetime = match time_unit {
- Second => col
- .as_any()
- .downcast_ref::<TimestampSecondArray>()
- .unwrap()
- .value_as_datetime(row_index)
- .unwrap(),
- Millisecond => col
- .as_any()
- .downcast_ref::<TimestampMillisecondArray>()
- .unwrap()
- .value_as_datetime(row_index)
- .unwrap(),
- Microsecond => col
- .as_any()
- .downcast_ref::<TimestampMicrosecondArray>()
- .unwrap()
- .value_as_datetime(row_index)
- .unwrap(),
- Nanosecond => col
- .as_any()
- .downcast_ref::<TimestampNanosecondArray>()
- .unwrap()
- .value_as_datetime(row_index)
- .unwrap(),
- };
- format!("{}", datetime.format(&self.timestamp_format))
+ DataType::Timestamp(time_unit, time_zone) => {
+ self.handle_timestamp(time_unit, time_zone, row_index, col)?
}
DataType::Decimal(..) => make_string_from_decimal(col, row_index)?,
t => {
@@ -258,6 +240,102 @@ impl<W: Write> Writer<W> {
Ok(())
}
+ #[cfg(not(feature = "chrono-tz"))]
+ fn handle_timestamp(
+ &self,
+ time_unit: &TimeUnit,
+ _time_zone: &Option<String>,
+ row_index: usize,
+ col: &ArrayRef,
+ ) -> Result<String> {
+ use TimeUnit::*;
+ let datetime = match time_unit {
+ Second => col
+ .as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .unwrap()
+ .value_as_datetime(row_index)
+ .unwrap(),
+ Millisecond => col
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .unwrap()
+ .value_as_datetime(row_index)
+ .unwrap(),
+ Microsecond => col
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap()
+ .value_as_datetime(row_index)
+ .unwrap(),
+ Nanosecond => col
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap()
+ .value_as_datetime(row_index)
+ .unwrap(),
+ };
+ Ok(format!("{}", datetime.format(&self.timestamp_format)))
+ }
+
+ #[cfg(feature = "chrono-tz")]
+ fn handle_timestamp(
+ &self,
+ time_unit: &TimeUnit,
+ time_zone: &Option<String>,
+ row_index: usize,
+ col: &ArrayRef,
+ ) -> Result<String> {
+ use TimeUnit::*;
+
+ let datetime = match time_unit {
+ Second => col
+ .as_any()
+ .downcast_ref::<TimestampSecondArray>()
+ .unwrap()
+ .value_as_datetime(row_index)
+ .unwrap(),
+ Millisecond => col
+ .as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .unwrap()
+ .value_as_datetime(row_index)
+ .unwrap(),
+ Microsecond => col
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap()
+ .value_as_datetime(row_index)
+ .unwrap(),
+ Nanosecond => col
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap()
+ .value_as_datetime(row_index)
+ .unwrap(),
+ };
+ let tzs = match time_zone {
+ None => "UTC".to_string(),
+ Some(tzs) => tzs.to_string(),
+ };
+
+ match using_chrono_tz_and_utc_naive_date_time(&tzs, datetime) {
+ Some(tz) => {
+ let utc_time = DateTime::<Utc>::from_utc(datetime, Utc);
+ Ok(format!(
+ "{}",
+ utc_time
+ .with_timezone(&tz)
+ .format(&self.timestamp_tz_format)
+ ))
+ }
+ err => Err(ArrowError::ComputeError(format!(
+ "{}: {:?}",
+ "Unable to parse timezone", err
+ ))),
+ }
+ }
+
/// Write a vector of record batches to a writable object
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
let num_columns = batch.num_columns();
@@ -311,6 +389,8 @@ pub struct WriterBuilder {
datetime_format: Option<String>,
/// Optional timestamp format for timestamp arrays
timestamp_format: Option<String>,
+ /// Optional timestamp format for timestamp with timezone arrays
+ timestamp_tz_format: Option<String>,
/// Optional time format for time arrays
time_format: Option<String>,
}
@@ -324,6 +404,7 @@ impl Default for WriterBuilder {
datetime_format: Some(DEFAULT_TIMESTAMP_FORMAT.to_string()),
time_format: Some(DEFAULT_TIME_FORMAT.to_string()),
timestamp_format: Some(DEFAULT_TIMESTAMP_FORMAT.to_string()),
+ timestamp_tz_format: Some(DEFAULT_TIMESTAMP_TZ_FORMAT.to_string()),
}
}
}
@@ -406,6 +487,9 @@ impl WriterBuilder {
timestamp_format: self
.timestamp_format
.unwrap_or_else(|| DEFAULT_TIMESTAMP_FORMAT.to_string()),
+ timestamp_tz_format: self
+ .timestamp_tz_format
+ .unwrap_or_else(|| DEFAULT_TIMESTAMP_TZ_FORMAT.to_string()),
beginning: true,
}
}
@@ -417,6 +501,7 @@ mod tests {
use crate::csv::Reader;
use crate::datatypes::{Field, Schema};
+ #[cfg(feature = "chrono-tz")]
use crate::util::string_writer::StringWriter;
use crate::util::test_util::get_temp_file;
use std::fs::File;
@@ -485,7 +570,16 @@ mod tests {
let mut buffer: Vec<u8> = vec![];
file.read_to_end(&mut buffer).unwrap();
- assert_eq!(
+ let expected = if cfg!(feature = "chrono-tz") {
+ r#"c1,c2,c3,c4,c5,c6,c7
+Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
+consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000+00:00,06:51:20,cupcakes
+sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000+00:00,23:46:03,foo
+Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
+consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000+00:00,06:51:20,cupcakes
+sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000+00:00,23:46:03,foo
+"#
+ } else {
r#"c1,c2,c3,c4,c5,c6,c7
Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000,06:51:20,cupcakes
@@ -494,9 +588,8 @@ Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,cupcakes
consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000,06:51:20,cupcakes
sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
"#
- .to_string(),
- String::from_utf8(buffer).unwrap()
- );
+ };
+ assert_eq!(expected.to_string(), String::from_utf8(buffer).unwrap());
}
#[test]
@@ -559,73 +652,53 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
);
}
+ #[cfg(feature = "chrono-tz")]
#[test]
- fn test_export_csv_string() {
+ fn test_export_csv_timestamps() {
let schema = Schema::new(vec![
- Field::new("c1", DataType::Utf8, false),
- Field::new("c2", DataType::Float64, true),
- Field::new("c3", DataType::UInt32, false),
- Field::new("c4", DataType::Boolean, true),
- Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
- Field::new("c6", DataType::Time32(TimeUnit::Second), false),
- Field::new("c7", DataType::Decimal(6, 2), false),
+ Field::new(
+ "c1",
+ DataType::Timestamp(
+ TimeUnit::Millisecond,
+ Some("Australia/Sydney".to_string()),
+ ),
+ true,
+ ),
+ Field::new("c2", DataType::Timestamp(TimeUnit::Millisecond, None), true),
]);
- let c1 = StringArray::from(vec![
- "Lorem ipsum dolor sit amet",
- "consectetur adipiscing elit",
- "sed do eiusmod tempor",
- ]);
- let c2 = PrimitiveArray::<Float64Type>::from(vec![
- Some(123.564532),
- None,
- Some(-556132.25),
- ]);
- let c3 = PrimitiveArray::<UInt32Type>::from(vec![3, 2, 1]);
- let c4 = BooleanArray::from(vec![Some(true), Some(false), None]);
- let c5 = TimestampMillisecondArray::from_opt_vec(
- vec![None, Some(1555584887378), Some(1555555555555)],
+ let c1 = TimestampMillisecondArray::from_opt_vec(
+ // 1555584887 converts to 2019-04-18, 20:54:47 in time zone Australia/Sydney (AEST).
+ // The offset (difference to UTC) is +10:00.
+ // 1635577147 converts to 2021-10-30 17:59:07 in time zone Australia/Sydney (AEDT)
+ // The offset (difference to UTC) is +11:00. Note that daylight savings is in effect on 2021-10-30.
+ //
+ vec![Some(1555584887378), Some(1635577147000)],
+ Some("Australia/Sydney".to_string()),
+ );
+ let c2 = TimestampMillisecondArray::from_opt_vec(
+ vec![Some(1555584887378), Some(1635577147000)],
None,
);
- let c6 = Time32SecondArray::from(vec![1234, 24680, 85563]);
- let mut c7_builder = DecimalBuilder::new(5, 6, 2);
- c7_builder.append_value(12345_i128).unwrap();
- c7_builder.append_value(-12345_i128).unwrap();
- c7_builder.append_null().unwrap();
- let c7 = c7_builder.finish();
-
- let batch = RecordBatch::try_new(
- Arc::new(schema),
- vec![
- Arc::new(c1),
- Arc::new(c2),
- Arc::new(c3),
- Arc::new(c4),
- Arc::new(c5),
- Arc::new(c6),
- Arc::new(c7),
- ],
- )
- .unwrap();
+ let batch =
+ RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1), Arc::new(c2)])
+ .unwrap();
let sw = StringWriter::new();
let mut writer = Writer::new(sw);
- let batches = vec![&batch, &batch];
+ let batches = vec![&batch];
for batch in batches {
writer.write(batch).unwrap();
}
- let left = "c1,c2,c3,c4,c5,c6,c7
-Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,123.45
-consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000,06:51:20,-123.45
-sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,
-Lorem ipsum dolor sit amet,123.564532,3,true,,00:20:34,123.45
-consectetur adipiscing elit,,2,false,2019-04-18T10:54:47.378000000,06:51:20,-123.45
-sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,\n";
+ let left = "c1,c2
+2019-04-18T20:54:47.378000000+10:00,2019-04-18T10:54:47.378000000+00:00
+2021-10-30T17:59:07.000000000+11:00,2021-10-30T06:59:07.000000000+00:00\n";
let right = writer.writer.into_inner().map(|s| s.to_string());
assert_eq!(Some(left.to_string()), right.ok());
}
+ #[cfg(not(feature = "chrono-tz"))]
#[test]
fn test_conversion_consistency() {
// test if we can serialize and deserialize whilst retaining the same type information/ precision
@@ -675,4 +748,70 @@ sed do eiusmod tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,\n";
let expected = vec![Some(3), Some(2), Some(1)];
assert_eq!(actual, expected);
}
+
+ #[cfg(feature = "chrono-tz")]
+ #[test]
+ fn test_conversion_consistency() {
+ // test if we can serialize and deserialize whilst retaining the same type information/ precision
+
+ let schema = Schema::new(vec![
+ Field::new("c1", DataType::Date32, false),
+ Field::new("c2", DataType::Date64, false),
+ Field::new("c3", DataType::Timestamp(TimeUnit::Nanosecond, None), false),
+ ]);
+
+ let nanoseconds = vec![
+ 1599566300000000000,
+ 1599566200000000000,
+ 1599566100000000000,
+ ];
+ let c1 = Date32Array::from(vec![3, 2, 1]);
+ let c2 = Date64Array::from(vec![3, 2, 1]);
+ let c3 = TimestampNanosecondArray::from_vec(nanoseconds.clone(), None);
+
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
+ )
+ .unwrap();
+
+ let builder = WriterBuilder::new().has_headers(false);
+
+ let mut buf: Cursor<Vec<u8>> = Default::default();
+ // drop the writer early to release the borrow.
+ {
+ let mut writer = builder.build(&mut buf);
+ writer.write(&batch).unwrap();
+ }
+ buf.set_position(0);
+
+ let mut reader = Reader::new(
+ buf,
+ Arc::new(schema),
+ false,
+ None,
+ 3,
+ // starting at row 2 and up to row 6.
+ None,
+ None,
+ );
+ let rb = reader.next().unwrap().unwrap();
+ let c1 = rb.column(0).as_any().downcast_ref::<Date32Array>().unwrap();
+ let c2 = rb.column(1).as_any().downcast_ref::<Date64Array>().unwrap();
+ let c3 = rb
+ .column(2)
+ .as_any()
+ .downcast_ref::<TimestampNanosecondArray>()
+ .unwrap();
+
+ let actual = c1.into_iter().collect::<Vec<_>>();
+ let expected = vec![Some(3), Some(2), Some(1)];
+ assert_eq!(actual, expected);
+ let actual = c2.into_iter().collect::<Vec<_>>();
+ let expected = vec![Some(3), Some(2), Some(1)];
+ assert_eq!(actual, expected);
+ let actual = c3.into_iter().collect::<Vec<_>>();
+ let expected = nanoseconds.into_iter().map(|x| Some(x)).collect::<Vec<_>>();
+ assert_eq!(actual, expected);
+ }
}