You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/13 22:01:10 UTC
[beam] branch master updated: [BEAM-2969] Handle negative AVRO
timestamps
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3d2b1c2 [BEAM-2969] Handle negative AVRO timestamps
new 803395c Merge pull request #8046 from udim/bigquery-negative-timestamps
3d2b1c2 is described below
commit 3d2b1c22b6124c15d50cdfd9257f9ed62427abc1
Author: Udi Meiri <eh...@google.com>
AuthorDate: Tue Mar 12 18:43:06 2019 -0700
[BEAM-2969] Handle negative AVRO timestamps
from BigQuery batch loads.
Also simplifies the code a bit. Microsecond part will have trailing
zeroes.
---
.../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 41 +++++++++-------------
.../sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java | 35 +++++++++++++++++-
2 files changed, 50 insertions(+), 26 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index 9238e77..52fd54d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -44,6 +44,7 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.BaseEncoding;
@@ -89,31 +90,23 @@ class BigQueryAvroUtils {
private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
- private static String formatTimestamp(String timestamp) {
- // timestamp is in "seconds since epoch" format, with scientific notation.
- // e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
+ @VisibleForTesting
+ static String formatTimestamp(Long timestampMicro) {
+ // timestampMicro is in "microseconds since epoch" format,
+ // e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
// Separate into seconds and microseconds.
- double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
- long timestampMicros = (long) timestampDoubleMicros;
- long seconds = timestampMicros / 1000000;
- int micros = (int) (timestampMicros % 1000000);
- String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(seconds * 1000);
+ long timestampSec = timestampMicro / 1_000_000;
+ long micros = timestampMicro % 1_000_000;
+ if (micros < 0) {
+ micros += 1_000_000;
+ timestampSec -= 1;
+ }
+ String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
- // No sub-second component.
if (micros == 0) {
return String.format("%s UTC", dayAndTime);
}
-
- // Sub-second component.
- int digits = 6;
- int subsecond = micros;
- while (subsecond % 10 == 0) {
- digits--;
- subsecond /= 10;
- }
- String formatString = String.format("%%0%dd", digits);
- String fractionalSeconds = String.format(formatString, subsecond);
- return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
+ return String.format("%s.%06d UTC", dayAndTime, micros);
}
/**
@@ -311,12 +304,10 @@ class BigQueryAvroUtils {
verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
return v;
case "TIMESTAMP":
- // TIMESTAMP data types are represented as Avro LONG types. They are converted back to
- // Strings with variable precision (up to six digits) to match the JSON files exported by
- // BigQuery.
+ // TIMESTAMP data types are represented as Avro LONG types, microseconds since the epoch.
+ // Values may be negative since BigQuery timestamps start at 0001-01-01 00:00:00 UTC.
verify(v instanceof Long, "Expected Long, got %s", v.getClass());
- double doubleValue = ((Long) v) / 1_000_000.0;
- return formatTimestamp(Double.toString(doubleValue));
+ return formatTimestamp((Long) v);
case "RECORD":
verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
index 38782ca..c645ee0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
@@ -271,6 +271,39 @@ public class BigQueryAvroUtilsTest {
(Object) null))))));
}
+ @Test
+ public void testFormatTimestamp() {
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(1452062291123456L),
+ equalTo("2016-01-06 06:38:11.123456 UTC"));
+ }
+
+ @Test
+ public void testFormatTimestampLeadingZeroesOnMicros() {
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(1452062291000456L),
+ equalTo("2016-01-06 06:38:11.000456 UTC"));
+ }
+
+ @Test
+ public void testFormatTimestampTrailingZeroesOnMicros() {
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(1452062291123000L),
+ equalTo("2016-01-06 06:38:11.123000 UTC"));
+ }
+
+ @Test
+ public void testFormatTimestampNegative() {
+ assertThat(BigQueryAvroUtils.formatTimestamp(-1L), equalTo("1969-12-31 23:59:59.999999 UTC"));
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(-100_000L), equalTo("1969-12-31 23:59:59.900000 UTC"));
+ assertThat(BigQueryAvroUtils.formatTimestamp(-1_000_000L), equalTo("1969-12-31 23:59:59 UTC"));
+ // No leap seconds before 1972. 477 leap years from 1 through 1969.
+ assertThat(
+ BigQueryAvroUtils.formatTimestamp(-(1969L * 365 + 477) * 86400 * 1_000_000),
+ equalTo("0001-01-01 00:00:00 UTC"));
+ }
+
/** Pojo class used as the record type in tests. */
@DefaultCoder(AvroCoder.class)
@SuppressWarnings("unused") // Used by Avro reflection.