You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2023/02/27 16:24:34 UTC

[beam] branch master updated: One formatter for Timestamp fields in Storage writes (#25472)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ec5673ac1c One formatter for Timestamp fields in Storage writes (#25472)
8ec5673ac1c is described below

commit 8ec5673ac1cc2813455dc6d554a1c7a2d1808559
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Mon Feb 27 19:24:27 2023 +0300

    One formatter for Timestamp fields in Storage writes (#25472)
    
    * handle timestamps with one formatter; handle UTC suffix
    
    * timestamp formatter that handles zone region suffix
---
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 39 +++++++++++---
 .../bigquery/TableRowToStorageApiProtoTest.java    | 60 +++++++++++++++++++---
 2 files changed, 86 insertions(+), 13 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 95a8027516d..49f6d436594 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -39,6 +39,7 @@ import com.google.protobuf.Message;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.math.RoundingMode;
+import java.time.DateTimeException;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -80,11 +81,32 @@ public class TableRowToStorageApiProto {
   private static final DateTimeFormatter DATETIME_SPACE_FORMATTER =
       new DateTimeFormatterBuilder()
           .append(DateTimeFormatter.ISO_LOCAL_DATE)
+          .optionalStart()
           .appendLiteral(' ')
+          .optionalEnd()
+          .optionalStart()
+          .appendLiteral('T')
+          .optionalEnd()
           .append(DateTimeFormatter.ISO_LOCAL_TIME)
           .toFormatter()
           .withZone(ZoneOffset.UTC);
 
+  private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+      new DateTimeFormatterBuilder()
+          // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS'
+          .append(DATETIME_SPACE_FORMATTER)
+          // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS(+HH:MM:ss|Z)'
+          .optionalStart()
+          .appendOffsetId()
+          .optionalEnd()
+          // 'yyyy-MM-dd(T| )HH:mm:ss.SSSSSSSSS [time_zone]', time_zone -> UTC, Asia/Kolkata, etc
+          // if both an offset and a time zone are provided, the offset takes precedence
+          .optionalStart()
+          .appendLiteral(' ')
+          .parseCaseSensitive()
+          .appendZoneRegionId()
+          .toFormatter();
+
   abstract static class SchemaConversionException extends Exception {
     SchemaConversionException(String msg) {
       super(msg);
@@ -737,18 +759,21 @@ public class TableRowToStorageApiProto {
       case TIMESTAMP:
         if (value instanceof String) {
           try {
-            // '2011-12-03T10:15:30+01:00' '2011-12-03T10:15:30'
+            // '2011-12-03T10:15:30Z', '2011-12-03 10:15:30+05:00'
+            // '2011-12-03 10:15:30 UTC', '2011-12-03T10:15:30 America/New_York'
             return ChronoUnit.MICROS.between(
-                Instant.EPOCH, Instant.from(DateTimeFormatter.ISO_DATE_TIME.parse((String) value)));
-          } catch (DateTimeParseException e) {
+                Instant.EPOCH, Instant.from(TIMESTAMP_FORMATTER.parse((String) value)));
+          } catch (DateTimeException e) {
             try {
+              // for backwards compatibility, default time zone is UTC for values with no time-zone
+              // '2011-12-03T10:15:30'
+              return ChronoUnit.MICROS.between(
+                  Instant.EPOCH,
+                  Instant.from(TIMESTAMP_FORMATTER.withZone(ZoneOffset.UTC).parse((String) value)));
+            } catch (DateTimeParseException err) {
               // "12345667"
               return ChronoUnit.MICROS.between(
                   Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) value)));
-            } catch (NumberFormatException e2) {
-              // "yyyy-MM-dd HH:mm:ss.SSSSSS"
-              return ChronoUnit.MICROS.between(
-                  Instant.EPOCH, Instant.from(DATETIME_SPACE_FORMATTER.parse((String) value)));
             }
           }
         } else if (value instanceof Instant) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index d143315ee59..c0e970bab84 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -94,6 +94,12 @@ public class TableRowToStorageApiProtoTest {
                   .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampISOValue"))
                   .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueLong"))
                   .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpace"))
+                  .add(
+                      new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpaceUtc"))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("TIMESTAMP")
+                          .setName("timestampValueZoneRegion"))
                   .add(
                       new TableFieldSchema()
                           .setType("TIMESTAMP")
@@ -133,6 +139,12 @@ public class TableRowToStorageApiProtoTest {
                   .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampISOValue"))
                   .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueLong"))
                   .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpace"))
+                  .add(
+                      new TableFieldSchema().setType("TIMESTAMP").setName("timestampValueSpaceUtc"))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("TIMESTAMP")
+                          .setName("timestampValueZoneRegion"))
                   .add(
                       new TableFieldSchema()
                           .setType("TIMESTAMP")
@@ -295,25 +307,39 @@ public class TableRowToStorageApiProtoTest {
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
-                  .setName("timestampvaluespacemilli")
+                  .setName("timestampvaluespaceutc")
                   .setNumber(22)
                   .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
-                  .setName("timestampvaluespacetrailingzero")
+                  .setName("timestampvaluezoneregion")
                   .setNumber(23)
                   .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
-                  .setName("datetimevaluespace")
+                  .setName("timestampvaluespacemilli")
                   .setNumber(24)
                   .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvaluespacetrailingzero")
+                  .setNumber(25)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datetimevaluespace")
+                  .setNumber(26)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
           .build();
 
   private static final DescriptorProto BASE_TABLE_SCHEMA_NO_F_PROTO =
@@ -460,25 +486,39 @@ public class TableRowToStorageApiProtoTest {
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
-                  .setName("timestampvaluespacemilli")
+                  .setName("timestampvaluespaceutc")
                   .setNumber(21)
                   .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
-                  .setName("timestampvaluespacetrailingzero")
+                  .setName("timestampvaluezoneregion")
                   .setNumber(22)
                   .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
-                  .setName("datetimevaluespace")
+                  .setName("timestampvaluespacemilli")
                   .setNumber(23)
                   .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvaluespacetrailingzero")
+                  .setNumber(24)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datetimevaluespace")
+                  .setNumber(25)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
           .build();
   private static final TableSchema NESTED_TABLE_SCHEMA =
       new TableSchema()
@@ -621,6 +661,8 @@ public class TableRowToStorageApiProtoTest {
                   new TableCell().setV("1970-01-01T00:00:00.000+01:00"),
                   new TableCell().setV("1234567"),
                   new TableCell().setV("1970-01-01 00:00:00.000343"),
+                  new TableCell().setV("1970-01-01 00:00:00.000343 UTC"),
+                  new TableCell().setV("1970-01-01 00:00:00.123456 America/New_York"),
                   new TableCell().setV("1970-01-01 00:00:00.123"),
                   new TableCell().setV("1970-01-01 00:00:00.1230"),
                   new TableCell().setV("2019-08-16 00:52:07.123456")));
@@ -650,6 +692,8 @@ public class TableRowToStorageApiProtoTest {
           .set("timestampValueLong", "1234567")
           // UTC time for backwards compatibility
           .set("timestampValueSpace", "1970-01-01 00:00:00.000343")
+          .set("timestampValueSpaceUtc", "1970-01-01 00:00:00.000343 UTC")
+          .set("timestampValueZoneRegion", "1970-01-01 00:00:00.123456 America/New_York")
           .set("timestampValueSpaceMilli", "1970-01-01 00:00:00.123")
           .set("timestampValueSpaceTrailingZero", "1970-01-01 00:00:00.1230")
           .set("datetimeValueSpace", "2019-08-16 00:52:07.123456");
@@ -686,6 +730,8 @@ public class TableRowToStorageApiProtoTest {
           .put("timestampisovalue", -3600000000L)
           .put("timestampvaluelong", 1234567000L)
           .put("timestampvaluespace", 343L)
+          .put("timestampvaluespaceutc", 343L)
+          .put("timestampvaluezoneregion", 18000123456L)
           .put("timestampvaluespacemilli", 123000L)
           .put("timestampvaluespacetrailingzero", 123000L)
           .put("datetimevaluespace", 142111881387172416L)
@@ -722,6 +768,8 @@ public class TableRowToStorageApiProtoTest {
           .put("timestampisovalue", -3600000000L)
           .put("timestampvaluelong", 1234567000L)
           .put("timestampvaluespace", 343L)
+          .put("timestampvaluespaceutc", 343L)
+          .put("timestampvaluezoneregion", 18000123456L)
           .put("timestampvaluespacemilli", 123000L)
           .put("timestampvaluespacetrailingzero", 123000L)
           .put("datetimevaluespace", 142111881387172416L)