You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/04/30 05:03:05 UTC
[beam] branch master updated: Merge pull request #17404: [BEAM-13990] support date and timestamp fields
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 58b4d762eec Merge pull request #17404: [BEAM-13990] support date and timestamp fields
58b4d762eec is described below
commit 58b4d762eece66774a5df6ca54e6f91c49057c9b
Author: Reuven Lax <re...@google.com>
AuthorDate: Fri Apr 29 22:02:56 2022 -0700
Merge pull request #17404: [BEAM-13990] support date and timestamp fields
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 +
sdks/java/io/google-cloud-platform/build.gradle | 2 +
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +
.../StorageApiDynamicDestinationsTableRow.java | 13 +-
.../bigquery/StorageApiWriteUnshardedRecords.java | 4 +
.../bigquery/StorageApiWritesShardedRecords.java | 4 +
.../sdk/io/gcp/bigquery/TableRowJsonCoder.java | 8 +-
.../io/gcp/bigquery/TableRowToStorageApiProto.java | 339 ++++++++++++-----
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 8 -
.../gcp/bigquery/TableRowToStorageApiProtoIT.java | 407 +++++++++++++++++++++
.../bigquery/TableRowToStorageApiProtoTest.java | 140 ++++---
.../beam/sdk/io/gcp/spanner/StructUtilsTest.java | 3 +-
12 files changed, 777 insertions(+), 154 deletions(-)
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 5d5b1e9377f..0f0c11dc2cb 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -631,6 +631,7 @@ class BeamModulePlugin implements Plugin<Project> {
jackson_dataformat_xml : "com.fasterxml.jackson.dataformat:jackson-dataformat-xml:$jackson_version",
jackson_dataformat_yaml : "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jackson_version",
jackson_datatype_joda : "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jackson_version",
+ jackson_datatype_jsr310 : "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jackson_version",
jackson_module_scala_2_11 : "com.fasterxml.jackson.module:jackson-module-scala_2.11:$jackson_version",
jackson_module_scala_2_12 : "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jackson_version",
// Swap to use the officially published version of 0.4.x once available
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 8d1d0a1776d..cd71fc35e21 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -116,6 +116,8 @@ dependencies {
implementation library.java.http_core
implementation library.java.jackson_core
implementation library.java.jackson_databind
+ implementation library.java.jackson_datatype_joda
+ implementation library.java.jackson_datatype_jsr310
implementation library.java.joda_time
implementation library.java.junit
implementation library.java.netty_handler
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 54f485a9d71..2a3e595adc1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -3085,6 +3085,8 @@ public class BigQueryIO {
CreateTables.clearCreatedTables();
TwoLevelMessageConverterCache.clear();
StorageApiDynamicDestinationsTableRow.clearSchemaCache();
+ StorageApiWriteUnshardedRecords.clearCache();
+ StorageApiWritesShardedRecords.clearCache();
}
/////////////////////////////////////////////////////////////////////////////
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index c48dbe0dedb..3c9f676cfad 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -70,6 +70,7 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
DestinationT destination, DatasetService datasetService) throws Exception {
return new MessageConverter<T>() {
TableSchema tableSchema;
+ TableRowToStorageApiProto.SchemaInformation schemaInformation;
Descriptor descriptor;
long descriptorHash;
@@ -103,7 +104,8 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
MoreObjects.firstNonNull(
SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), tableSchema);
}
-
+ schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
}
@@ -138,6 +140,8 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
}
synchronized (this) {
tableSchema = newSchema;
+ schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
long newHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
if (descriptorHash != newHash) {
@@ -154,16 +158,21 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
public StorageApiWritePayload toMessage(T element) throws Exception {
int attempt = 0;
do {
+ TableRowToStorageApiProto.SchemaInformation localSchemaInformation;
Descriptor localDescriptor;
long localDescriptorHash;
synchronized (this) {
+ localSchemaInformation = schemaInformation;
localDescriptor = descriptor;
localDescriptorHash = descriptorHash;
}
try {
Message msg =
TableRowToStorageApiProto.messageFromTableRow(
- localDescriptor, formatFunction.apply(element), ignoreUnknownValues);
+ localSchemaInformation,
+ localDescriptor,
+ formatFunction.apply(element),
+ ignoreUnknownValues);
return new AutoValue_StorageApiWritePayload(msg.toByteArray(), localDescriptorHash);
} catch (SchemaTooNarrowException e) {
if (attempt > schemaUpdateRetries) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index f8619d5fd62..1751799dd51 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -87,6 +87,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
})
.build();
+ static void clearCache() {
+ APPEND_CLIENTS.invalidateAll();
+ }
+
// Run a closure asynchronously, ignoring failures.
private interface ThrowingRunnable {
void run() throws Exception;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 787cb0c2d62..c0f60f34c61 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -119,6 +119,10 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT>
})
.build();
+ static void clearCache() {
+ APPEND_CLIENTS.invalidateAll();
+ }
+
// Run a closure asynchronously, ignoring failures.
private interface ThrowingRunnable {
void run() throws Exception;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index 7e82eec212d..9b80c0b552b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.io.InputStream;
@@ -68,7 +70,11 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
// FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
// TableRow.
private static final ObjectMapper MAPPER =
- new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+ new ObjectMapper()
+ .registerModule(new JavaTimeModule())
+ .registerModule(new JodaModule())
+ .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
+ .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 2f241a434a8..750542b7121 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -22,6 +22,7 @@ import static java.util.stream.Collectors.toList;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
@@ -34,19 +35,28 @@ import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.ChronoUnit;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
-import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.joda.time.Days;
/**
* Utility methods for converting JSON {@link TableRow} objects to dynamic protocol message, for use
@@ -71,6 +81,58 @@ public class TableRowToStorageApiProto {
}
}
+ static class SchemaInformation {
+ private final TableFieldSchema tableFieldSchema;
+ private final List<SchemaInformation> subFields;
+ private final Map<String, SchemaInformation> subFieldsByName;
+
+ private SchemaInformation(TableFieldSchema tableFieldSchema) {
+ this.tableFieldSchema = tableFieldSchema;
+ this.subFields = Lists.newArrayList();
+ this.subFieldsByName = Maps.newHashMap();
+ if (tableFieldSchema.getFields() != null) {
+ for (TableFieldSchema field : tableFieldSchema.getFields()) {
+ SchemaInformation schemaInformation = new SchemaInformation(field);
+ subFields.add(schemaInformation);
+ subFieldsByName.put(field.getName(), schemaInformation);
+ }
+ }
+ }
+
+ public String getName() {
+ return tableFieldSchema.getName();
+ }
+
+ public String getType() {
+ return tableFieldSchema.getType();
+ }
+
+ public SchemaInformation getSchemaForField(String name) {
+ SchemaInformation schemaInformation = subFieldsByName.get(name);
+ if (schemaInformation == null) {
+ throw new RuntimeException("Schema field not found: " + name);
+ }
+ return schemaInformation;
+ }
+
+ public SchemaInformation getSchemaForField(int i) {
+ SchemaInformation schemaInformation = subFields.get(i);
+ if (schemaInformation == null) {
+ throw new RuntimeException("Schema field not found: " + i);
+ }
+ return schemaInformation;
+ }
+
+ static SchemaInformation fromTableSchema(TableSchema tableSchema) {
+ TableFieldSchema rootSchema =
+ new TableFieldSchema()
+ .setName("__root__")
+ .setType("RECORD")
+ .setFields(tableSchema.getFields());
+ return new SchemaInformation(rootSchema);
+ }
+ }
+
static final Map<String, Type> PRIMITIVE_TYPES =
ImmutableMap.<String, Type>builder()
.put("INT64", Type.TYPE_INT64)
@@ -81,13 +143,13 @@ public class TableRowToStorageApiProto {
.put("BOOL", Type.TYPE_BOOL)
.put("BOOLEAN", Type.TYPE_BOOL)
.put("BYTES", Type.TYPE_BYTES)
- .put("NUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding.
- .put("BIGNUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding.
+ .put("NUMERIC", Type.TYPE_BYTES) // Pass through the JSON encoding.
+ .put("BIGNUMERIC", Type.TYPE_BYTES) // Pass through the JSON encoding.
.put("GEOGRAPHY", Type.TYPE_STRING) // Pass through the JSON encoding.
- .put("DATE", Type.TYPE_STRING) // Pass through the JSON encoding.
- .put("TIME", Type.TYPE_STRING) // Pass through the JSON encoding.
- .put("DATETIME", Type.TYPE_STRING) // Pass through the JSON encoding.
- .put("TIMESTAMP", Type.TYPE_STRING) // Pass through the JSON encoding.
+ .put("DATE", Type.TYPE_INT32)
+ .put("TIME", Type.TYPE_INT64)
+ .put("DATETIME", Type.TYPE_INT64)
+ .put("TIMESTAMP", Type.TYPE_INT64)
.put("JSON", Type.TYPE_STRING)
.build();
@@ -107,7 +169,10 @@ public class TableRowToStorageApiProto {
}
public static DynamicMessage messageFromMap(
- Descriptor descriptor, AbstractMap<String, Object> map, boolean ignoreUnknownValues)
+ SchemaInformation schemaInformation,
+ Descriptor descriptor,
+ AbstractMap<String, Object> map,
+ boolean ignoreUnknownValues)
throws SchemaConversionException {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
for (Map.Entry<String, Object> entry : map.entrySet()) {
@@ -121,9 +186,12 @@ public class TableRowToStorageApiProto {
"TableRow contained unexpected field with name " + entry.getKey());
}
}
+ SchemaInformation fieldSchemaInformation =
+ schemaInformation.getSchemaForField(entry.getKey());
@Nullable
Object value =
- messageValueFromFieldValue(fieldDescriptor, entry.getValue(), ignoreUnknownValues);
+ messageValueFromFieldValue(
+ fieldSchemaInformation, fieldDescriptor, entry.getValue(), ignoreUnknownValues);
if (value != null) {
builder.setField(fieldDescriptor, value);
}
@@ -136,7 +204,10 @@ public class TableRowToStorageApiProto {
* using the BigQuery Storage API.
*/
public static DynamicMessage messageFromTableRow(
- Descriptor descriptor, TableRow tableRow, boolean ignoreUnkownValues)
+ SchemaInformation schemaInformation,
+ Descriptor descriptor,
+ TableRow tableRow,
+ boolean ignoreUnkownValues)
throws SchemaConversionException {
@Nullable Object fValue = tableRow.get("f");
if (fValue instanceof List) {
@@ -153,9 +224,11 @@ public class TableRowToStorageApiProto {
for (int i = 0; i < cellsToProcess; ++i) {
AbstractMap<String, Object> cell = cells.get(i);
FieldDescriptor fieldDescriptor = descriptor.getFields().get(i);
+ SchemaInformation fieldSchemaInformation = schemaInformation.getSchemaForField(i);
@Nullable
Object value =
- messageValueFromFieldValue(fieldDescriptor, cell.get("v"), ignoreUnkownValues);
+ messageValueFromFieldValue(
+ fieldSchemaInformation, fieldDescriptor, cell.get("v"), ignoreUnkownValues);
if (value != null) {
builder.setField(fieldDescriptor, value);
}
@@ -163,7 +236,7 @@ public class TableRowToStorageApiProto {
return builder.build();
} else {
- return messageFromMap(descriptor, tableRow, ignoreUnkownValues);
+ return messageFromMap(schemaInformation, descriptor, tableRow, ignoreUnkownValues);
}
}
@@ -218,125 +291,191 @@ public class TableRowToStorageApiProto {
}
@Nullable
+ @SuppressWarnings({"nullness"})
private static Object messageValueFromFieldValue(
- FieldDescriptor fieldDescriptor, @Nullable Object bqValue, boolean ignoreUnknownValues)
+ SchemaInformation schemaInformation,
+ FieldDescriptor fieldDescriptor,
+ @Nullable Object bqValue,
+ boolean ignoreUnknownValues)
throws SchemaConversionException {
if (bqValue == null) {
if (fieldDescriptor.isOptional()) {
return null;
} else if (fieldDescriptor.isRepeated()) {
return Collections.emptyList();
- }
- {
+ } else {
throw new IllegalArgumentException(
"Received null value for non-nullable field " + fieldDescriptor.getName());
}
}
- return toProtoValue(
- fieldDescriptor, bqValue, fieldDescriptor.isRepeated(), ignoreUnknownValues);
- }
-
- private static final Map<FieldDescriptor.Type, Function<String, Object>>
- JSON_PROTO_STRING_PARSERS =
- ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
- .put(FieldDescriptor.Type.INT32, Integer::valueOf)
- .put(FieldDescriptor.Type.INT64, Long::valueOf)
- .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
- .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
- .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
- .put(FieldDescriptor.Type.STRING, str -> str)
- .put(
- FieldDescriptor.Type.BYTES,
- b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
- .build();
- @Nullable
- @SuppressWarnings({"nullness"})
- @VisibleForTesting
- static Object toProtoValue(
- FieldDescriptor fieldDescriptor,
- Object jsonBQValue,
- boolean isRepeated,
- boolean ignoreUnknownValues)
- throws SchemaConversionException {
- if (isRepeated) {
- List<Object> listValue = (List<Object>) jsonBQValue;
+ if (fieldDescriptor.isRepeated()) {
+ List<Object> listValue = (List<Object>) bqValue;
List<Object> protoList = Lists.newArrayListWithCapacity(listValue.size());
- for (Object o : listValue) {
- protoList.add(toProtoValue(fieldDescriptor, o, false, ignoreUnknownValues));
+ for (@Nullable Object o : listValue) {
+ if (o != null) { // repeated field cannot contain null.
+ protoList.add(
+ singularFieldToProtoValue(
+ schemaInformation, fieldDescriptor, o, ignoreUnknownValues));
+ }
}
return protoList;
}
-
- if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
- if (jsonBQValue instanceof TableRow) {
- TableRow tableRow = (TableRow) jsonBQValue;
- return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow, ignoreUnknownValues);
- } else if (jsonBQValue instanceof AbstractMap) {
- // This will handle nested rows.
- AbstractMap<String, Object> map = ((AbstractMap<String, Object>) jsonBQValue);
- return messageFromMap(fieldDescriptor.getMessageType(), map, ignoreUnknownValues);
- } else {
- throw new RuntimeException("Unexpected value " + jsonBQValue + " Expected a JSON map.");
- }
- }
- @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor, jsonBQValue);
- if (scalarValue == null) {
- return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated, ignoreUnknownValues);
- } else {
- return scalarValue;
- }
+ return singularFieldToProtoValue(
+ schemaInformation, fieldDescriptor, bqValue, ignoreUnknownValues);
}
@VisibleForTesting
@Nullable
- static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object jsonBQValue) {
- if (jsonBQValue instanceof String) {
- Function<String, Object> mapper = JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
- if (mapper == null) {
- throw new UnsupportedOperationException(
- "Converting BigQuery type '"
- + jsonBQValue.getClass()
- + "' to '"
- + fieldDescriptor
- + "' is not supported");
- }
- return mapper.apply((String) jsonBQValue);
- }
-
- switch (fieldDescriptor.getType()) {
- case BOOL:
- if (jsonBQValue instanceof Boolean) {
- return jsonBQValue;
+ static Object singularFieldToProtoValue(
+ SchemaInformation schemaInformation,
+ FieldDescriptor fieldDescriptor,
+ Object value,
+ boolean ignoreUnknownValues)
+ throws SchemaConversionException {
+ switch (schemaInformation.getType()) {
+ case "INT64":
+ case "INTEGER":
+ if (value instanceof String) {
+ return Long.valueOf((String) value);
+ } else if (value instanceof Integer || value instanceof Long) {
+ return ((Number) value).longValue();
}
break;
- case BYTES:
+ case "FLOAT64":
+ case "FLOAT":
+ if (value instanceof String) {
+ return Double.valueOf((String) value);
+ } else if (value instanceof Double || value instanceof Float) {
+ return ((Number) value).doubleValue();
+ }
break;
- case INT64:
- if (jsonBQValue instanceof Integer) {
- return Long.valueOf((Integer) jsonBQValue);
- } else if (jsonBQValue instanceof Long) {
- return jsonBQValue;
+ case "BOOLEAN":
+ case "BOOL":
+ if (value instanceof String) {
+ return Boolean.valueOf((String) value);
+ } else if (value instanceof Boolean) {
+ return value;
}
break;
- case INT32:
- if (jsonBQValue instanceof Integer) {
- return jsonBQValue;
+ case "BYTES":
+ if (value instanceof String) {
+ return ByteString.copyFrom(BaseEncoding.base64().decode((String) value));
+ } else if (value instanceof byte[]) {
+ return ByteString.copyFrom((byte[]) value);
+ } else if (value instanceof ByteString) {
+ return value;
}
break;
- case STRING:
+ case "TIMESTAMP":
+ if (value instanceof String) {
+ try {
+ return ChronoUnit.MICROS.between(Instant.EPOCH, Instant.parse((String) value));
+ } catch (DateTimeParseException e) {
+ return ChronoUnit.MICROS.between(
+ Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) value)));
+ }
+ } else if (value instanceof Instant) {
+ return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value);
+ } else if (value instanceof org.joda.time.Instant) {
+ // joda instant precision is millisecond
+ return ((org.joda.time.Instant) value).getMillis() * 1000L;
+ } else if (value instanceof Integer || value instanceof Long) {
+ return ((Number) value).longValue();
+ } else if (value instanceof Double || value instanceof Float) {
+ // assume value represents number of seconds since epoch
+ return BigDecimal.valueOf(((Number) value).doubleValue())
+ .scaleByPowerOfTen(6)
+ .setScale(0, RoundingMode.HALF_UP)
+ .longValue();
+ }
break;
- case DOUBLE:
- if (jsonBQValue instanceof Double) {
- return jsonBQValue;
- } else if (jsonBQValue instanceof Float) {
- return Double.valueOf((Float) jsonBQValue);
+ case "DATE":
+ if (value instanceof String) {
+ return ((Long) LocalDate.parse((String) value).toEpochDay()).intValue();
+ } else if (value instanceof LocalDate) {
+ return ((Long) ((LocalDate) value).toEpochDay()).intValue();
+ } else if (value instanceof org.joda.time.LocalDate) {
+ return Days.daysBetween(
+ org.joda.time.Instant.EPOCH.toDateTime().toLocalDate(),
+ (org.joda.time.LocalDate) value)
+ .getDays();
+ } else if (value instanceof Integer || value instanceof Long) {
+ return ((Number) value).intValue();
+ }
+ break;
+ case "NUMERIC":
+ if (value instanceof String) {
+ return BigDecimalByteStringEncoder.encodeToNumericByteString(
+ new BigDecimal((String) value));
+ } else if (value instanceof BigDecimal) {
+ return BigDecimalByteStringEncoder.encodeToNumericByteString(((BigDecimal) value));
+ } else if (value instanceof Double || value instanceof Float) {
+ return BigDecimalByteStringEncoder.encodeToNumericByteString(
+ BigDecimal.valueOf(((Number) value).doubleValue()));
+ }
+ break;
+ case "BIGNUMERIC":
+ if (value instanceof String) {
+ return BigDecimalByteStringEncoder.encodeToBigNumericByteString(
+ new BigDecimal((String) value));
+ } else if (value instanceof BigDecimal) {
+ return BigDecimalByteStringEncoder.encodeToBigNumericByteString(((BigDecimal) value));
+ } else if (value instanceof Double || value instanceof Float) {
+ return BigDecimalByteStringEncoder.encodeToBigNumericByteString(
+ BigDecimal.valueOf(((Number) value).doubleValue()));
+ }
+ break;
+ case "DATETIME":
+ if (value instanceof String) {
+ return CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) value));
+ } else if (value instanceof Number) {
+ return ((Number) value).longValue();
+ } else if (value instanceof LocalDateTime) {
+ return CivilTimeEncoder.encodePacked64DatetimeMicros((LocalDateTime) value);
+ } else if (value instanceof org.joda.time.LocalDateTime) {
+ return CivilTimeEncoder.encodePacked64DatetimeMicros((org.joda.time.LocalDateTime) value);
+ }
+ break;
+ case "TIME":
+ if (value instanceof String) {
+ return CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) value));
+ } else if (value instanceof Number) {
+ return ((Number) value).longValue();
+ } else if (value instanceof LocalTime) {
+ return CivilTimeEncoder.encodePacked64TimeMicros((LocalTime) value);
+ } else if (value instanceof org.joda.time.LocalTime) {
+ return CivilTimeEncoder.encodePacked64TimeMicros((org.joda.time.LocalTime) value);
+ }
+ break;
+ case "STRING":
+ case "JSON":
+ case "GEOGRAPHY":
+ return value.toString();
+ case "STRUCT":
+ case "RECORD":
+ if (value instanceof TableRow) {
+ TableRow tableRow = (TableRow) value;
+ return messageFromTableRow(
+ schemaInformation, fieldDescriptor.getMessageType(), tableRow, ignoreUnknownValues);
+ } else if (value instanceof AbstractMap) {
+ // This will handle nested rows.
+ AbstractMap<String, Object> map = ((AbstractMap<String, Object>) value);
+ return messageFromMap(
+ schemaInformation, fieldDescriptor.getMessageType(), map, ignoreUnknownValues);
}
break;
- default:
- throw new RuntimeException("Unsupported proto type " + fieldDescriptor.getType());
}
- return null;
+
+ throw new RuntimeException(
+ "Unexpected value :"
+ + value
+ + ", type: "
+ + value.getClass()
+ + ". Table field name: "
+ + schemaInformation.getName()
+ + ", type: "
+ + schemaInformation.getType());
}
@VisibleForTesting
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 35dc57218f1..50f1bde09b9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -523,14 +523,6 @@ public class BigQueryIOWriteTest implements Serializable {
testTimePartitioning(method);
}
- @Test
- public void testTimePartitioningStorageApi() throws Exception {
- if (!useStorageApi) {
- return;
- }
- testTimePartitioning(Method.STORAGE_WRITE_API);
- }
-
@Test
public void testClusteringStorageApi() throws Exception {
if (useStorageApi) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java
new file mode 100644
index 00000000000..34962a90a57
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoIT {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class);
+ private static final BigqueryClient BQ_CLIENT = new BigqueryClient("TableRowToStorageApiProtoIT");
+ private static final String PROJECT =
+ TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+ private static final String BIG_QUERY_DATASET_ID =
+ "table_row_to_storage_api_proto_" + System.nanoTime();
+
+ private static final TableSchema BASE_TABLE_SCHEMA =
+ new TableSchema()
+ .setFields(
+ ImmutableList.<TableFieldSchema>builder()
+ .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+ .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+ .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+ .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+ .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+ .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+ .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+ .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+ .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+ .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+ .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+ .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+ .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+ .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue"))
+ .add(
+ new TableFieldSchema()
+ .setType("BYTES")
+ .setMode("REPEATED")
+ .setName("arrayValue"))
+ .build());
+
+ private static final List<Object> REPEATED_BYTES =
+ ImmutableList.of(
+ BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)),
+ "goodbye".getBytes(StandardCharsets.UTF_8),
+ "solong".getBytes(StandardCharsets.UTF_8));
+
+ private static final TableRow BASE_TABLE_ROW =
+ new TableRow()
+ .set("stringValue", "string")
+ .set(
+ "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+ .set("int64Value", "42")
+ .set("intValue", "43")
+ .set("float64Value", "2.8168")
+ .set("floatValue", "2.817")
+ .set("boolValue", "true")
+ .set("booleanValue", "true")
+ .set("timestampValue", "1970-01-01T00:00:00.000043Z")
+ .set("timeValue", "00:52:07.123456")
+ .set("datetimeValue", "2019-08-16T00:52:07.123456")
+ .set("dateValue", "2019-08-16")
+ .set("numericValue", "23.4")
+ .set("bigNumericValue", "23334.4")
+ .set("arrayValue", REPEATED_BYTES);
+
+ private static final TableRow BASE_TABLE_ROW_JODA_TIME =
+ new TableRow()
+ .set("stringValue", "string")
+ .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+ .set("int64Value", 42)
+ .set("intValue", 43)
+ .set("float64Value", 2.8168f)
+ .set("floatValue", 2.817f)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue", org.joda.time.Instant.parse("1970-01-01T00:00:00.0043Z"))
+ .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456"))
+ .set("datetimeValue", org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+ .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16"))
+ .set("numericValue", new BigDecimal("23.4"))
+ .set("bigNumericValue", "23334.4")
+ .set("arrayValue", REPEATED_BYTES);
+
+ private static final TableRow BASE_TABLE_ROW_JAVA_TIME =
+ new TableRow()
+ .set("stringValue", "string")
+ .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+ .set("int64Value", 42)
+ .set("intValue", 43)
+ .set("float64Value", 2.8168f)
+ .set("floatValue", 2.817f)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z"))
+ .set("timeValue", LocalTime.parse("00:52:07.123456"))
+ .set("datetimeValue", LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+ .set("dateValue", LocalDate.parse("2019-08-16"))
+ .set("numericValue", new BigDecimal("23.4"))
+ .set("bigNumericValue", "23334.4")
+ .set("arrayValue", REPEATED_BYTES);
+
+ private static final TableRow BASE_TABLE_ROW_NUM_TIME =
+ new TableRow()
+ .set("stringValue", "string")
+ .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+ .set("int64Value", 42)
+ .set("intValue", 43)
+ .set("float64Value", 2.8168f)
+ .set("floatValue", 2.817f)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue", 43)
+ .set("timeValue", 3497124416L)
+ .set("datetimeValue", 142111881387172416L)
+ .set("dateValue", 18124)
+ .set("numericValue", new BigDecimal("23.4"))
+ .set("bigNumericValue", "23334.4")
+ .set("arrayValue", REPEATED_BYTES);
+
+ private static final TableRow BASE_TABLE_ROW_FLOATS =
+ new TableRow()
+ .set("stringValue", "string")
+ .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+ .set("int64Value", 42)
+ .set("intValue", 43)
+ .set("float64Value", 2.8168f)
+ .set("floatValue", 2.817f)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue", 43)
+ .set("timeValue", 3497124416L)
+ .set("datetimeValue", 142111881387172416D)
+ .set("dateValue", 18124)
+ .set("numericValue", 23.4)
+ .set("bigNumericValue", "23334.4")
+ .set("arrayValue", REPEATED_BYTES);
+
+ private static final TableRow BASE_TABLE_ROW_NULL =
+ new TableRow()
+ // .set("stringValue", null) // do not set stringValue, this should work
+ .set("bytesValue", null)
+ .set("int64Value", null)
+ .set("intValue", null)
+ .set("float64Value", null)
+ .set("floatValue", null)
+ .set("boolValue", null)
+ .set("booleanValue", null)
+ .set("timestampValue", null)
+ .set("timeValue", null)
+ .set("datetimeValue", null)
+ .set("dateValue", null)
+ .set("numericValue", null)
+ .set("arrayValue", null);
+
+ private static final List<Object> REPEATED_BYTES_EXPECTED =
+ ImmutableList.of(
+ BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)),
+ BaseEncoding.base64().encode("goodbye".getBytes(StandardCharsets.UTF_8)),
+ BaseEncoding.base64().encode("solong".getBytes(StandardCharsets.UTF_8)));
+
+ private static final TableRow BASE_TABLE_ROW_EXPECTED =
+ new TableRow()
+ .set("stringValue", "string")
+ .set(
+ "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+ .set("int64Value", "42")
+ .set("intValue", "43")
+ .set("float64Value", 2.8168)
+ .set("floatValue", 2.817)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue", "4.3E-5")
+ .set("timeValue", "00:52:07.123456")
+ .set("datetimeValue", "2019-08-16T00:52:07.123456")
+ .set("dateValue", "2019-08-16")
+ .set("numericValue", "23.4")
+ .set("bigNumericValue", "23334.4")
+ .set("arrayValue", REPEATED_BYTES_EXPECTED);
+
+ // joda is up to millisecond precision, expect truncation
+ private static final TableRow BASE_TABLE_ROW_JODA_EXPECTED =
+ new TableRow()
+ .set("stringValue", "string")
+ .set(
+ "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+ .set("int64Value", "42")
+ .set("intValue", "43")
+ .set("float64Value", 2.8168)
+ .set("floatValue", 2.817)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue", "0.004")
+ .set("timeValue", "00:52:07.123000")
+ .set("datetimeValue", "2019-08-16T00:52:07.123000")
+ .set("dateValue", "2019-08-16")
+ .set("numericValue", "23.4")
+ .set("bigNumericValue", "23334.4")
+ .set("arrayValue", REPEATED_BYTES_EXPECTED);
+
+ private static final TableRow BASE_TABLE_ROW_NUM_EXPECTED =
+ new TableRow()
+ .set("stringValue", "string")
+ .set(
+ "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+ .set("int64Value", "42")
+ .set("intValue", "43")
+ .set("float64Value", 2.8168)
+ .set("floatValue", 2.817)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue", "4.3E-5")
+ .set("timeValue", "00:52:07.123456")
+ .set("datetimeValue", "2019-08-16T00:52:07.123456")
+ .set("dateValue", "2019-08-16")
+ .set("numericValue", "23.4")
+ .set("bigNumericValue", "23334.4")
+ .set("arrayValue", REPEATED_BYTES_EXPECTED);
+
+ private static final TableRow BASE_TABLE_ROW_FLOATS_EXPECTED =
+ new TableRow()
+ .set("stringValue", "string")
+ .set(
+ "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+ .set("int64Value", "42")
+ .set("intValue", "43")
+ .set("float64Value", 2.8168)
+ .set("floatValue", 2.817)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue", "4.3E-5")
+ .set("timeValue", "00:52:07.123456")
+ .set("datetimeValue", "2019-08-16T00:52:07.123456")
+ .set("dateValue", "2019-08-16")
+ .set("numericValue", "23.4")
+ .set("bigNumericValue", "23334.4")
+ .set("arrayValue", REPEATED_BYTES_EXPECTED);
+
+ // only nonnull values are returned, null in arrayValue should be converted to empty list
+ private static final TableRow BASE_TABLE_ROW_NULL_EXPECTED =
+ new TableRow().set("arrayValue", ImmutableList.of());
+
+ private static final TableSchema NESTED_TABLE_SCHEMA =
+ new TableSchema()
+ .setFields(
+ ImmutableList.<TableFieldSchema>builder()
+ .add(
+ new TableFieldSchema()
+ .setType("STRUCT")
+ .setName("nestedValue1")
+ .setMode("REQUIRED")
+ .setFields(BASE_TABLE_SCHEMA.getFields()))
+ .add(
+ new TableFieldSchema()
+ .setType("RECORD")
+ .setName("nestedValue2")
+ .setMode("REPEATED")
+ .setFields(BASE_TABLE_SCHEMA.getFields()))
+ .add(
+ new TableFieldSchema()
+ .setType("RECORD")
+ .setName("nestedValue3")
+ .setMode("NULLABLE")
+ .setFields(BASE_TABLE_SCHEMA.getFields()))
+ .build());
+
+ @BeforeClass
+ public static void setUpTestEnvironment() throws IOException, InterruptedException {
+ // Create one BQ dataset for all test cases.
+ BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ LOG.info("Start to clean up tables and datasets.");
+ BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
+ }
+
+ @Test
+ public void testBaseTableRow() throws IOException, InterruptedException {
+ String tableSpec = createTable(BASE_TABLE_SCHEMA);
+
+ runPipeline(tableSpec, Collections.singleton(BASE_TABLE_ROW));
+
+ List<TableRow> actualTableRows =
+ BQ_CLIENT.queryUnflattened(String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true);
+
+ assertEquals(1, actualTableRows.size());
+ assertEquals(BASE_TABLE_ROW_EXPECTED, actualTableRows.get(0));
+ }
+
+ @Test
+ public void testNestedRichTypesAndNull() throws IOException, InterruptedException {
+ String tableSpec = createTable(NESTED_TABLE_SCHEMA);
+ TableRow tableRow =
+ new TableRow()
+ .set("nestedValue1", BASE_TABLE_ROW)
+ .set(
+ "nestedValue2",
+ Arrays.asList(
+ BASE_TABLE_ROW_JAVA_TIME,
+ BASE_TABLE_ROW_JODA_TIME,
+ BASE_TABLE_ROW_NUM_TIME,
+ BASE_TABLE_ROW_FLOATS,
+ BASE_TABLE_ROW_NULL))
+ .set("nestedValue3", null);
+
+ runPipeline(tableSpec, Collections.singleton(tableRow));
+
+ List<TableRow> actualTableRows =
+ BQ_CLIENT.queryUnflattened(String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true);
+
+ assertEquals(1, actualTableRows.size());
+ assertEquals(BASE_TABLE_ROW_EXPECTED, actualTableRows.get(0).get("nestedValue1"));
+ LOG.info("ACTUAL " + actualTableRows.get(0).get("nestedValue2"));
+ assertEquals(
+ ImmutableList.of(
+ BASE_TABLE_ROW_EXPECTED,
+ BASE_TABLE_ROW_JODA_EXPECTED,
+ BASE_TABLE_ROW_NUM_EXPECTED,
+ BASE_TABLE_ROW_FLOATS_EXPECTED,
+ BASE_TABLE_ROW_NULL_EXPECTED),
+ actualTableRows.get(0).get("nestedValue2"));
+ assertNull(actualTableRows.get(0).get("nestedValue3"));
+ }
+
+ private static String createTable(TableSchema tableSchema)
+ throws IOException, InterruptedException {
+ String table = "table" + System.nanoTime();
+ BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, table);
+ BQ_CLIENT.createNewTable(
+ PROJECT,
+ BIG_QUERY_DATASET_ID,
+ new Table()
+ .setSchema(tableSchema)
+ .setTableReference(
+ new TableReference()
+ .setTableId(table)
+ .setDatasetId(BIG_QUERY_DATASET_ID)
+ .setProjectId(PROJECT)));
+ return PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + table;
+ }
+
+ private static void runPipeline(String tableSpec, Iterable<TableRow> tableRows) {
+ Pipeline p = Pipeline.create();
+ p.apply("Create test cases", Create.of(tableRows))
+ .apply(
+ "Write using Storage Write API",
+ BigQueryIO.<TableRow>write()
+ .to(tableSpec)
+ .withFormatFunction(SerializableFunctions.identity())
+ .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
+ p.run().waitUntilFinish();
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 483cddc08c5..459fed3a168 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
@@ -32,7 +33,9 @@ import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -73,9 +76,10 @@ public class TableRowToStorageApiProtoTest {
.add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
.add(new TableFieldSchema().setType("DATE").setName("dateValue"))
.add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+ .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue"))
.add(
new TableFieldSchema()
- .setType("STRING")
+ .setType("BYTES")
.setMode("REPEATED")
.setName("arrayValue"))
.build());
@@ -97,9 +101,10 @@ public class TableRowToStorageApiProtoTest {
.add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
.add(new TableFieldSchema().setType("DATE").setName("dateValue"))
.add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+ .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue"))
.add(
new TableFieldSchema()
- .setType("STRING")
+ .setType("BYTES")
.setMode("REPEATED")
.setName("arrayValue"))
.build());
@@ -173,42 +178,49 @@ public class TableRowToStorageApiProtoTest {
FieldDescriptorProto.newBuilder()
.setName("timestampvalue")
.setNumber(10)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("timevalue")
.setNumber(11)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("datetimevalue")
.setNumber(12)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("datevalue")
.setNumber(13)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_INT32)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("numericvalue")
.setNumber(14)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_BYTES)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
- .setName("arrayvalue")
+ .setName("bignumericvalue")
.setNumber(15)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_BYTES)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("arrayvalue")
+ .setNumber(16)
+ .setType(Type.TYPE_BYTES)
.setLabel(Label.LABEL_REPEATED)
.build())
.build();
@@ -275,42 +287,49 @@ public class TableRowToStorageApiProtoTest {
FieldDescriptorProto.newBuilder()
.setName("timestampvalue")
.setNumber(9)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("timevalue")
.setNumber(10)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("datetimevalue")
.setNumber(11)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("datevalue")
.setNumber(2)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_INT32)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("numericvalue")
.setNumber(13)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_BYTES)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
- .setName("arrayvalue")
+ .setName("bignumericvalue")
.setNumber(14)
- .setType(Type.TYPE_STRING)
+ .setType(Type.TYPE_BYTES)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("arrayvalue")
+ .setNumber(15)
+ .setType(Type.TYPE_BYTES)
.setLabel(Label.LABEL_REPEATED)
.build())
.build();
@@ -414,6 +433,18 @@ public class TableRowToStorageApiProtoTest {
assertEquals(expectedBaseTypesNoF, nestedTypesNoF2);
}
+ private static final List<Object> REPEATED_BYTES =
+ ImmutableList.of(
+ BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)),
+ "goodbye".getBytes(StandardCharsets.UTF_8),
+ ByteString.copyFrom("solong".getBytes(StandardCharsets.UTF_8)));
+
+ private static final List<Object> EXPECTED_PROTO_REPEATED_BYTES =
+ ImmutableList.of(
+ ByteString.copyFrom("hello".getBytes(StandardCharsets.UTF_8)),
+ ByteString.copyFrom("goodbye".getBytes(StandardCharsets.UTF_8)),
+ ByteString.copyFrom("solong".getBytes(StandardCharsets.UTF_8)));
+
private static final TableRow BASE_TABLE_ROW =
new TableRow()
.setF(
@@ -429,12 +460,13 @@ public class TableRowToStorageApiProtoTest {
new TableCell().setV("2.817"),
new TableCell().setV("true"),
new TableCell().setV("true"),
- new TableCell().setV("43"),
- new TableCell().setV("00:52:07[.123]|[.123456] UTC"),
- new TableCell().setV("2019-08-16 00:52:07[.123]|[.123456] UTC"),
+ new TableCell().setV("1970-01-01T00:00:00.000043Z"),
+ new TableCell().setV("00:52:07.123456"),
+ new TableCell().setV("2019-08-16T00:52:07.123456"),
new TableCell().setV("2019-08-16"),
new TableCell().setV("23.4"),
- new TableCell().setV(ImmutableList.of("hello", "goodbye"))));
+ new TableCell().setV("2312345.4"),
+ new TableCell().setV(REPEATED_BYTES)));
private static final TableRow BASE_TABLE_ROW_NO_F =
new TableRow()
@@ -447,12 +479,13 @@ public class TableRowToStorageApiProtoTest {
.set("floatValue", "2.817")
.set("boolValue", "true")
.set("booleanValue", "true")
- .set("timestampValue", "43")
- .set("timeValue", "00:52:07[.123]|[.123456] UTC")
- .set("datetimeValue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
+ .set("timestampValue", "1970-01-01T00:00:00.000043Z")
+ .set("timeValue", "00:52:07.123456")
+ .set("datetimeValue", "2019-08-16T00:52:07.123456")
.set("dateValue", "2019-08-16")
.set("numericValue", "23.4")
- .set("arrayValue", ImmutableList.of("hello", "goodbye"));
+ .set("bigNumericValue", "2312345.4")
+ .set("arrayValue", REPEATED_BYTES);
private static final Map<String, Object> BASE_ROW_EXPECTED_PROTO_VALUES =
ImmutableMap.<String, Object>builder()
@@ -465,12 +498,17 @@ public class TableRowToStorageApiProtoTest {
.put("floatvalue", (double) 2.817)
.put("boolvalue", true)
.put("booleanvalue", true)
- .put("timestampvalue", "43")
- .put("timevalue", "00:52:07[.123]|[.123456] UTC")
- .put("datetimevalue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
- .put("datevalue", "2019-08-16")
- .put("numericvalue", "23.4")
- .put("arrayvalue", ImmutableList.of("hello", "goodbye"))
+ .put("timestampvalue", 43L)
+ .put("timevalue", 3497124416L)
+ .put("datetimevalue", 142111881387172416L)
+ .put("datevalue", (int) LocalDate.of(2019, 8, 16).toEpochDay())
+ .put(
+ "numericvalue",
+ BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("23.4")))
+ .put(
+ "bignumericvalue",
+ BigDecimalByteStringEncoder.encodeToBigNumericByteString(new BigDecimal("2312345.4")))
+ .put("arrayvalue", EXPECTED_PROTO_REPEATED_BYTES)
.build();
private static final Map<String, Object> BASE_ROW_NO_F_EXPECTED_PROTO_VALUES =
@@ -483,12 +521,17 @@ public class TableRowToStorageApiProtoTest {
.put("floatvalue", (double) 2.817)
.put("boolvalue", true)
.put("booleanvalue", true)
- .put("timestampvalue", "43")
- .put("timevalue", "00:52:07[.123]|[.123456] UTC")
- .put("datetimevalue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
- .put("datevalue", "2019-08-16")
- .put("numericvalue", "23.4")
- .put("arrayvalue", ImmutableList.of("hello", "goodbye"))
+ .put("timestampvalue", 43L)
+ .put("timevalue", 3497124416L)
+ .put("datetimevalue", 142111881387172416L)
+ .put("datevalue", (int) LocalDate.parse("2019-08-16").toEpochDay())
+ .put(
+ "numericvalue",
+ BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("23.4")))
+ .put(
+ "bignumericvalue",
+ BigDecimalByteStringEncoder.encodeToBigNumericByteString(new BigDecimal("2312345.4")))
+ .put("arrayvalue", EXPECTED_PROTO_REPEATED_BYTES)
.build();
private void assertBaseRecord(DynamicMessage msg, boolean withF) {
@@ -506,12 +549,16 @@ public class TableRowToStorageApiProtoTest {
new TableRow()
.set("nestedValue1", BASE_TABLE_ROW)
.set("nestedValue2", BASE_TABLE_ROW)
- .set("nestedvalueNoF1", BASE_TABLE_ROW_NO_F)
- .set("nestedvalueNoF2", BASE_TABLE_ROW_NO_F);
+ .set("nestedValueNoF1", BASE_TABLE_ROW_NO_F)
+ .set("nestedValueNoF2", BASE_TABLE_ROW_NO_F);
Descriptor descriptor =
TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA);
- DynamicMessage msg = TableRowToStorageApiProto.messageFromTableRow(descriptor, tableRow, false);
+ TableRowToStorageApiProto.SchemaInformation schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA);
+ DynamicMessage msg =
+ TableRowToStorageApiProto.messageFromTableRow(
+ schemaInformation, descriptor, tableRow, false);
assertEquals(4, msg.getAllFields().size());
Map<String, FieldDescriptor> fieldDescriptors =
@@ -527,8 +574,11 @@ public class TableRowToStorageApiProtoTest {
public void testMessageWithFFromTableRow() throws Exception {
Descriptor descriptor =
TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA);
+ TableRowToStorageApiProto.SchemaInformation schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA);
DynamicMessage msg =
- TableRowToStorageApiProto.messageFromTableRow(descriptor, BASE_TABLE_ROW, false);
+ TableRowToStorageApiProto.messageFromTableRow(
+ schemaInformation, descriptor, BASE_TABLE_ROW, false);
assertBaseRecord(msg, true);
}
@@ -567,8 +617,11 @@ public class TableRowToStorageApiProtoTest {
.set("repeatednof2", ImmutableList.of(BASE_TABLE_ROW_NO_F, BASE_TABLE_ROW_NO_F));
Descriptor descriptor =
TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA);
+ TableRowToStorageApiProto.SchemaInformation schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA);
DynamicMessage msg =
- TableRowToStorageApiProto.messageFromTableRow(descriptor, repeatedRow, false);
+ TableRowToStorageApiProto.messageFromTableRow(
+ schemaInformation, descriptor, repeatedRow, false);
assertEquals(4, msg.getAllFields().size());
Map<String, FieldDescriptor> fieldDescriptors =
@@ -609,8 +662,11 @@ public class TableRowToStorageApiProtoTest {
.set("repeatednof2", null);
Descriptor descriptor =
TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA);
+ TableRowToStorageApiProto.SchemaInformation schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA);
DynamicMessage msg =
- TableRowToStorageApiProto.messageFromTableRow(descriptor, repeatedRow, false);
+ TableRowToStorageApiProto.messageFromTableRow(
+ schemaInformation, descriptor, repeatedRow, false);
Map<String, FieldDescriptor> fieldDescriptors =
descriptor.getFields().stream()
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
index 577a470cdcd..d58ad0b6b53 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
@@ -69,7 +69,8 @@ public class StructUtilsTest {
Struct struct = Struct.newBuilder().set("f_int64").to("string_value").build();
Exception exception =
assertThrows(ClassCastException.class, () -> StructUtils.structToBeamRow(struct, schema));
- checkMessage("java.lang.String cannot be cast to java.lang.Long", exception.getMessage());
+ checkMessage("java.lang.String cannot be cast to", exception.getMessage());
+ checkMessage("java.lang.Long", exception.getMessage());
}
@Test