You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2023/02/27 16:24:34 UTC
[beam] branch master updated: One formatter for Timestamp fields in Storage writes (#25472)
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8ec5673ac1c One formatter for Timestamp fields in Storage writes (#25472)
8ec5673ac1c is described below
commit 8ec5673ac1cc2813455dc6d554a1c7a2d1808559
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Mon Feb 27 19:24:27 2023 +0300
One formatter for Timestamp fields in Storage writes (#25472)
* handle timestamps with one formatter; handle UTC suffix
* timestamp formatter that handles zone region suffix
---
.../io/gcp/bigquery/TableRowToStorageApiProto.java | 39 +++++++++++---
.../bigquery/TableRowToStorageApiProtoTest.java | 60 +++++++++++++++++++---
2 files changed, 86 insertions(+), 13 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 95a8027516d..49f6d436594 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -39,6 +39,7 @@ import com.google.protobuf.Message;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
+import java.time.DateTimeException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -80,11 +81,32 @@ public class TableRowToStorageApiProto {
private static final DateTimeFormatter DATETIME_SPACE_FORMATTER =
new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
+ .optionalStart()
.appendLiteral(' ')
+ .optionalEnd()
+ .optionalStart()
+ .appendLiteral('T')
+ .optionalEnd()
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.toFormatter()
.withZone(ZoneOffset.UTC);
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+ new DateTimeFormatterBuilder()
+ // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS'
+ .append(DATETIME_SPACE_FORMATTER)
+ // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS(+HH:MM:ss|Z)'
+ .optionalStart()
+ .appendOffsetId()
+ .optionalEnd()
+ // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS [time_zone]', time_zone -> UTC, Asia/Kolkata, etc
+ // if both an offset and a time zone are provided, the offset takes precedence
+ .optionalStart()
+ .appendLiteral(' ')
+ .parseCaseSensitive()
+ .appendZoneRegionId()
+ .toFormatter();
+
abstract static class SchemaConversionException extends Exception {
SchemaConversionException(String msg) {
super(msg);
@@ -737,18 +759,21 @@ public class TableRowToStorageApiProto {
case TIMESTAMP:
if (value instanceof String) {
try {
- // '2011-12-03T10:15:30+01:00' '2011-12-03T10:15:30'
+ // '2011-12-03T10:15:30Z', '2011-12-03 10:15:30+05:00'
+ // '2011-12-03 10:15:30 UTC', '2011-12-03T10:15:30 America/New_York'
return ChronoUnit.MICROS.between(
- Instant.EPOCH, Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse((String) value)));
- } catch (DateTimeParseException e) {
+ Instant.EPOCH, Instant.from(TIMESTAMP_FORMATTER.parse((String) value)));
+ } catch (DateTimeException e) {
try {
+ // for backwards compatibility, default time zone is UTC for values with no time-zone
+ // '2011-12-03T10:15:30'
+ return ChronoUnit.MICROS.between(
+ Instant.EPOCH,
+ Instant.from(TIMESTAMP_FORMATTER.withZone(ZoneOffset.UTC).parse((String) value)));
+ } catch (DateTimeParseException err) {
// "12345667"
return ChronoUnit.MICROS.between(
Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) value)));
- } catch (NumberFormatException e2) {
- // "yyyy-MM-dd HH:mm:ss.SSSSSS"
- return ChronoUnit.MICROS.between(
- Instant.EPOCH, Instant.from(DATETIME_SPACE_FORMATTER.parse((String) value)));
}
}
} else if (value instanceof Instant) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index d143315ee59..c0e970bab84 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -94,6 +94,12 @@ public class TableRowToStorageApiProtoTest {
.add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampISOValue"))
.add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueLong"))
.add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpace"))
+ .add(
+ new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpaceUtc"))
+ .add(
+ new TableFieldSchema()
+ .setType("TIMESTAMP")
+ .setName("timestampValueZoneRegion"))
.add(
new TableFieldSchema()
.setType("TIMESTAMP")
@@ -133,6 +139,12 @@ public class TableRowToStorageApiProtoTest {
.add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampISOValue"))
.add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueLong"))
.add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpace"))
+ .add(
+ new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpaceUtc"))
+ .add(
+ new TableFieldSchema()
+ .setType("TIMESTAMP")
+ .setName("timestampValueZoneRegion"))
.add(
new TableFieldSchema()
.setType("TIMESTAMP")
@@ -295,25 +307,39 @@ public class TableRowToStorageApiProtoTest {
.build())
.addField(
FieldDescriptorProto.newBuilder()
- .setName("timestampvaluespacemilli")
+ .setName("timestampvaluespaceutc")
.setNumber(22)
.setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
- .setName("timestampvaluespacetrailingzero")
+ .setName("timestampvaluezoneregion")
.setNumber(23)
.setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
- .setName("datetimevaluespace")
+ .setName("timestampvaluespacemilli")
.setNumber(24)
.setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("timestampvaluespacetrailingzero")
+ .setNumber(25)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("datetimevaluespace")
+ .setNumber(26)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
.build();
private static final DescriptorProto BASE_TABLE_SCHEMA_NO_F_PROTO =
@@ -460,25 +486,39 @@ public class TableRowToStorageApiProtoTest {
.build())
.addField(
FieldDescriptorProto.newBuilder()
- .setName("timestampvaluespacemilli")
+ .setName("timestampvaluespaceutc")
.setNumber(21)
.setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
- .setName("timestampvaluespacetrailingzero")
+ .setName("timestampvaluezoneregion")
.setNumber(22)
.setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
- .setName("datetimevaluespace")
+ .setName("timestampvaluespacemilli")
.setNumber(23)
.setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("timestampvaluespacetrailingzero")
+ .setNumber(24)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("datetimevaluespace")
+ .setNumber(25)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
.build();
private static final TableSchema NESTED_TABLE_SCHEMA =
new TableSchema()
@@ -621,6 +661,8 @@ public class TableRowToStorageApiProtoTest {
new TableCell().setV("1970-01-01T00:00:00.000+01:00"),
new TableCell().setV("1234567"),
new TableCell().setV("1970-01-01 00:00:00.000343"),
+ new TableCell().setV("1970-01-01 00:00:00.000343 UTC"),
+ new TableCell().setV("1970-01-01 00:00:00.123456 America/New_York"),
new TableCell().setV("1970-01-01 00:00:00.123"),
new TableCell().setV("1970-01-01 00:00:00.1230"),
new TableCell().setV("2019-08-16 00:52:07.123456")));
@@ -650,6 +692,8 @@ public class TableRowToStorageApiProtoTest {
.set("timestampValueLong", "1234567")
// UTC time for backwards compatibility
.set("timestampValueSpace", "1970-01-01 00:00:00.000343")
+ .set("timestampValueSpaceUtc", "1970-01-01 00:00:00.000343 UTC")
+ .set("timestampValueZoneRegion", "1970-01-01 00:00:00.123456 America/New_York")
.set("timestampValueSpaceMilli", "1970-01-01 00:00:00.123")
.set("timestampValueSpaceTrailingZero", "1970-01-01 00:00:00.1230")
.set("datetimeValueSpace", "2019-08-16 00:52:07.123456");
@@ -686,6 +730,8 @@ public class TableRowToStorageApiProtoTest {
.put("timestampisovalue", -3600000000L)
.put("timestampvaluelong", 1234567000L)
.put("timestampvaluespace", 343L)
+ .put("timestampvaluespaceutc", 343L)
+ .put("timestampvaluezoneregion", 18000123456L)
.put("timestampvaluespacemilli", 123000L)
.put("timestampvaluespacetrailingzero", 123000L)
.put("datetimevaluespace", 142111881387172416L)
@@ -722,6 +768,8 @@ public class TableRowToStorageApiProtoTest {
.put("timestampisovalue", -3600000000L)
.put("timestampvaluelong", 1234567000L)
.put("timestampvaluespace", 343L)
+ .put("timestampvaluespaceutc", 343L)
+ .put("timestampvaluezoneregion", 18000123456L)
.put("timestampvaluespacemilli", 123000L)
.put("timestampvaluespacetrailingzero", 123000L)
.put("datetimevaluespace", 142111881387172416L)