You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/13 22:01:10 UTC

[beam] branch master updated: [BEAM-2969] Handle negative AVRO timestamps

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d2b1c2  [BEAM-2969] Handle negative AVRO timestamps
     new 803395c  Merge pull request #8046 from udim/bigquery-negative-timestamps
3d2b1c2 is described below

commit 3d2b1c22b6124c15d50cdfd9257f9ed62427abc1
Author: Udi Meiri <eh...@google.com>
AuthorDate: Tue Mar 12 18:43:06 2019 -0700

    [BEAM-2969] Handle negative AVRO timestamps
    
    from BigQuery batch loads.
    
    Also simplifies the code a bit. Microsecond part will have trailing
    zeroes.
---
 .../sdk/io/gcp/bigquery/BigQueryAvroUtils.java     | 41 +++++++++-------------
 .../sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java | 35 +++++++++++++++++-
 2 files changed, 50 insertions(+), 26 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index 9238e77..52fd54d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -44,6 +44,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableCollection;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMultimap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.io.BaseEncoding;
@@ -89,31 +90,23 @@ class BigQueryAvroUtils {
   private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
       DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
 
-  private static String formatTimestamp(String timestamp) {
-    // timestamp is in "seconds since epoch" format, with scientific notation.
-    // e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
+  @VisibleForTesting
+  static String formatTimestamp(Long timestampMicro) {
+    // timestampMicro is in "microseconds since epoch" format,
+    // e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
     // Separate into seconds and microseconds.
-    double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
-    long timestampMicros = (long) timestampDoubleMicros;
-    long seconds = timestampMicros / 1000000;
-    int micros = (int) (timestampMicros % 1000000);
-    String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(seconds * 1000);
+    long timestampSec = timestampMicro / 1_000_000;
+    long micros = timestampMicro % 1_000_000;
+    if (micros < 0) {
+      micros += 1_000_000;
+      timestampSec -= 1;
+    }
+    String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
 
-    // No sub-second component.
     if (micros == 0) {
       return String.format("%s UTC", dayAndTime);
     }
-
-    // Sub-second component.
-    int digits = 6;
-    int subsecond = micros;
-    while (subsecond % 10 == 0) {
-      digits--;
-      subsecond /= 10;
-    }
-    String formatString = String.format("%%0%dd", digits);
-    String fractionalSeconds = String.format(formatString, subsecond);
-    return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
+    return String.format("%s.%06d UTC", dayAndTime, micros);
   }
 
   /**
@@ -311,12 +304,10 @@ class BigQueryAvroUtils {
         verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
         return v;
       case "TIMESTAMP":
-        // TIMESTAMP data types are represented as Avro LONG types. They are converted back to
-        // Strings with variable precision (up to six digits) to match the JSON files exported by
-        // BigQuery.
+        // TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch.
+        // Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC.
         verify(v instanceof Long, "Expected Long, got %s", v.getClass());
-        double doubleValue = ((Long) v) / 1_000_000.0;
-        return formatTimestamp(Double.toString(doubleValue));
+        return formatTimestamp((Long) v);
       case "RECORD":
         verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
         return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index 38782ca..c645ee0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
@@ -271,6 +271,39 @@ public class BigQueryAvroUtilsTest {
                             (Object) null))))));
   }
 
+  @Test
+  public void testFormatTimestamp() {
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(1452062291123456L),
+        equalTo("2016-01-06 06:38:11.123456 UTC"));
+  }
+
+  @Test
+  public void testFormatTimestampLeadingZeroesOnMicros() {
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(1452062291000456L),
+        equalTo("2016-01-06 06:38:11.000456 UTC"));
+  }
+
+  @Test
+  public void testFormatTimestampTrailingZeroesOnMicros() {
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(1452062291123000L),
+        equalTo("2016-01-06 06:38:11.123000 UTC"));
+  }
+
+  @Test
+  public void testFormatTimestampNegative() {
+    assertThat(BigQueryAvroUtils.formatTimestamp(-1L), equalTo("1969-12-31 23:59:59.999999 UTC"));
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(-100_000L), equalTo("1969-12-31 23:59:59.900000 UTC"));
+    assertThat(BigQueryAvroUtils.formatTimestamp(-1_000_000L), equalTo("1969-12-31 23:59:59 UTC"));
+    // No leap seconds before 1972. 477 leap years from 1 through 1969.
+    assertThat(
+        BigQueryAvroUtils.formatTimestamp(-(1969L * 365 + 477) * 86400 * 1_000_000),
+        equalTo("0001-01-01 00:00:00 UTC"));
+  }
+
   /** Pojo class used as the record type in tests. */
   @DefaultCoder(AvroCoder.class)
   @SuppressWarnings("unused") // Used by Avro reflection.