You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2021/03/05 16:08:38 UTC
[beam] branch master updated: Merge pull request #14136:
[BEAM-11648] Add conversion utilities for BigQuery Storage API sink
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e039ca2 Merge pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink
e039ca2 is described below
commit e039ca28d6f806f30b87cae82e6af86694c171cd
Author: reuvenlax <re...@google.com>
AuthorDate: Fri Mar 5 08:07:41 2021 -0800
Merge pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink
---
.../io/gcp/bigquery/BeamRowToStorageApiProto.java | 336 +++++++++++
.../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 221 +------
.../beam/sdk/io/gcp/bigquery/CivilTimeEncoder.java | 648 +++++++++++++++++++++
.../io/gcp/bigquery/TableRowToStorageApiProto.java | 304 ++++++++++
.../gcp/bigquery/BeamRowToStorageApiProtoTest.java | 391 +++++++++++++
.../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 260 ---------
.../bigquery/TableRowToStorageApiProtoTest.java | 312 ++++++++++
7 files changed, 1993 insertions(+), 479 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
new file mode 100644
index 0000000..816cbe9
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+ // Number of digits after the decimal point supported by the NUMERIC data type.
+ private static final int NUMERIC_SCALE = 9;
+ // Maximum and minimum allowed values for the NUMERIC data type.
+ private static final BigDecimal MAX_NUMERIC_VALUE =
+ new BigDecimal("99999999999999999999999999999.999999999");
+ private static final BigDecimal MIN_NUMERIC_VALUE =
+ new BigDecimal("-99999999999999999999999999999.999999999");
+
+ // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+ static final Map<TypeName, Type> PRIMITIVE_TYPES =
+ ImmutableMap.<TypeName, Type>builder()
+ .put(TypeName.INT16, Type.TYPE_INT32)
+ .put(TypeName.BYTE, Type.TYPE_INT32)
+ .put(TypeName.INT32, Type.TYPE_INT32)
+ .put(TypeName.INT64, Type.TYPE_INT64)
+ .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+ .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+ .put(TypeName.STRING, Type.TYPE_STRING)
+ .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+ .put(TypeName.DATETIME, Type.TYPE_INT64)
+ .put(TypeName.BYTES, Type.TYPE_BYTES)
+ .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+ .build();
+
+ // A map of supported logical types to the protobuf field type.
+ static final Map<String, Type> LOGICAL_TYPES =
+ ImmutableMap.<String, Type>builder()
+ .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+ .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+ .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+ .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+ .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+ .build();
+
+ static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
+ ImmutableMap.<TypeName, Function<Object, Object>>builder()
+ .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
+ .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
+ .put(TypeName.INT32, Functions.identity())
+ .put(TypeName.INT64, Functions.identity())
+ .put(TypeName.FLOAT, Function.identity())
+ .put(TypeName.DOUBLE, Function.identity())
+ .put(TypeName.STRING, Function.identity())
+ .put(TypeName.BOOLEAN, Function.identity())
+ // A Beam DATETIME is actually a timestamp, not a DateTime.
+ .put(TypeName.DATETIME, o -> ((ReadableInstant) o).getMillis() * 1000)
+ .put(TypeName.BYTES, o -> ByteString.copyFrom((byte[]) o))
+ .put(TypeName.DECIMAL, o -> serializeBigDecimalToNumeric((BigDecimal) o))
+ .build();
+
+ // A map of supported logical types to their encoding functions.
+ static final Map<String, BiFunction<LogicalType<?, ?>, Object, Object>> LOGICAL_TYPE_ENCODERS =
+ ImmutableMap.<String, BiFunction<LogicalType<?, ?>, Object, Object>>builder()
+ .put(
+ SqlTypes.DATE.getIdentifier(),
+ (logicalType, value) -> (int) ((LocalDate) value).toEpochDay())
+ .put(
+ SqlTypes.TIME.getIdentifier(),
+ (logicalType, value) -> CivilTimeEncoder.encodePacked64TimeMicros((LocalTime) value))
+ .put(
+ SqlTypes.DATETIME.getIdentifier(),
+ (logicalType, value) ->
+ CivilTimeEncoder.encodePacked64DatetimeSeconds((LocalDateTime) value))
+ .put(
+ SqlTypes.TIMESTAMP.getIdentifier(),
+ (logicalType, value) -> ((java.time.Instant) value).toEpochMilli() * 1000)
+ .put(
+ EnumerationType.IDENTIFIER,
+ (logicalType, value) ->
+ ((EnumerationType) logicalType).toString((EnumerationType.Value) value))
+ .build();
+
+ /**
+ * Given a Beam Schema, returns a protocol-buffer Descriptor that can be used to write data using
+ * the BigQuery Storage API.
+ */
+ public static Descriptor getDescriptorFromSchema(Schema schema)
+ throws DescriptorValidationException {
+ DescriptorProto descriptorProto = descriptorSchemaFromBeamSchema(schema);
+ FileDescriptorProto fileDescriptorProto =
+ FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
+ FileDescriptor fileDescriptor =
+ FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]);
+
+ return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
+ }
+
+ /**
+ * Given a Beam {@link Row} object, returns a protocol-buffer message that can be used to write
+ * data using the BigQuery Storage streaming API.
+ */
+ public static DynamicMessage messageFromBeamRow(Descriptor descriptor, Row row) {
+ Schema beamSchema = row.getSchema();
+ DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+ for (int i = 0; i < row.getFieldCount(); ++i) {
+ Field beamField = beamSchema.getField(i);
+ FieldDescriptor fieldDescriptor =
+ Preconditions.checkNotNull(descriptor.findFieldByName(beamField.getName().toLowerCase()));
+ @Nullable Object value = messageValueFromRowValue(fieldDescriptor, beamField, i, row);
+ if (value != null) {
+ builder.setField(fieldDescriptor, value);
+ }
+ }
+ return builder.build();
+ }
+
+ @VisibleForTesting
+ static DescriptorProto descriptorSchemaFromBeamSchema(Schema schema) {
+ Preconditions.checkState(schema.getFieldCount() > 0);
+ DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+ // Create a unique name for the descriptor ('-' characters cannot be used).
+ descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
+ int i = 1;
+ List<DescriptorProto> nestedTypes = Lists.newArrayList();
+ for (Field field : schema.getFields()) {
+ FieldDescriptorProto.Builder fieldDescriptorProtoBuilder =
+ fieldDescriptorFromBeamField(field, i++, nestedTypes);
+ descriptorBuilder.addField(fieldDescriptorProtoBuilder);
+ }
+ nestedTypes.forEach(descriptorBuilder::addNestedType);
+ return descriptorBuilder.build();
+ }
+
+ private static FieldDescriptorProto.Builder fieldDescriptorFromBeamField(
+ Field field, int fieldNumber, List<DescriptorProto> nestedTypes) {
+ FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setName(field.getName().toLowerCase());
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+
+ switch (field.getType().getTypeName()) {
+ case ROW:
+ @Nullable Schema rowSchema = field.getType().getRowSchema();
+ if (rowSchema == null) {
+ throw new RuntimeException("Unexpected null schema!");
+ }
+ DescriptorProto nested = descriptorSchemaFromBeamSchema(rowSchema);
+ nestedTypes.add(nested);
+ fieldDescriptorBuilder =
+ fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+ break;
+ case ARRAY:
+ case ITERABLE:
+ @Nullable FieldType elementType = field.getType().getCollectionElementType();
+ if (elementType == null) {
+ throw new RuntimeException("Unexpected null element type!");
+ }
+ Preconditions.checkState(
+ !Preconditions.checkNotNull(elementType.getTypeName()).isCollectionType(),
+ "Nested arrays not supported by BigQuery.");
+ return fieldDescriptorFromBeamField(
+ Field.of(field.getName(), elementType), fieldNumber, nestedTypes)
+ .setLabel(Label.LABEL_REPEATED);
+ case LOGICAL_TYPE:
+ @Nullable LogicalType<?, ?> logicalType = field.getType().getLogicalType();
+ if (logicalType == null) {
+ throw new RuntimeException("Unexpected null logical type " + field.getType());
+ }
+ @Nullable Type type = LOGICAL_TYPES.get(logicalType.getIdentifier());
+ if (type == null) {
+ throw new RuntimeException("Unsupported logical type " + field.getType());
+ }
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setType(type);
+ break;
+ case MAP:
+ throw new RuntimeException("Map types not supported by BigQuery.");
+ default:
+ @Nullable Type primitiveType = PRIMITIVE_TYPES.get(field.getType().getTypeName());
+ if (primitiveType == null) {
+ throw new RuntimeException("Unsupported type " + field.getType());
+ }
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setType(primitiveType);
+ }
+ if (field.getType().getNullable()) {
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
+ } else {
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+ }
+ return fieldDescriptorBuilder;
+ }
+
+ @Nullable
+ private static Object messageValueFromRowValue(
+ FieldDescriptor fieldDescriptor, Field beamField, int index, Row row) {
+ @Nullable Object value = row.getValue(index);
+ if (value == null) {
+ if (fieldDescriptor.isOptional()) {
+ return null;
+ } else {
+ throw new IllegalArgumentException(
+ "Received null value for non-nullable field " + fieldDescriptor.getName());
+ }
+ }
+ return toProtoValue(fieldDescriptor, beamField.getType(), value);
+ }
+
+ private static Object toProtoValue(
+ FieldDescriptor fieldDescriptor, FieldType beamFieldType, Object value) {
+ switch (beamFieldType.getTypeName()) {
+ case ROW:
+ return messageFromBeamRow(fieldDescriptor.getMessageType(), (Row) value);
+ case ARRAY:
+ List<Object> list = (List<Object>) value;
+ @Nullable FieldType arrayElementType = beamFieldType.getCollectionElementType();
+ if (arrayElementType == null) {
+ throw new RuntimeException("Unexpected null element type!");
+ }
+ return list.stream()
+ .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
+ .collect(Collectors.toList());
+ case ITERABLE:
+ Iterable<Object> iterable = (Iterable<Object>) value;
+ @Nullable FieldType iterableElementType = beamFieldType.getCollectionElementType();
+ if (iterableElementType == null) {
+ throw new RuntimeException("Unexpected null element type!");
+ }
+ return StreamSupport.stream(iterable.spliterator(), false)
+ .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v))
+ .collect(Collectors.toList());
+ case MAP:
+ throw new RuntimeException("Map types not supported by BigQuery.");
+ default:
+ return scalarToProtoValue(beamFieldType, value);
+ }
+ }
+
+ @VisibleForTesting
+ static Object scalarToProtoValue(FieldType beamFieldType, Object value) {
+ if (beamFieldType.getTypeName() == TypeName.LOGICAL_TYPE) {
+ @Nullable LogicalType<?, ?> logicalType = beamFieldType.getLogicalType();
+ if (logicalType == null) {
+ throw new RuntimeException("Unexpectedly null logical type " + beamFieldType);
+ }
+ @Nullable
+ BiFunction<LogicalType<?, ?>, Object, Object> logicalTypeEncoder =
+ LOGICAL_TYPE_ENCODERS.get(logicalType.getIdentifier());
+ if (logicalTypeEncoder == null) {
+ throw new RuntimeException("Unsupported logical type " + logicalType.getIdentifier());
+ }
+ return logicalTypeEncoder.apply(logicalType, value);
+ } else {
+ @Nullable
+ Function<Object, Object> encoder = PRIMITIVE_ENCODERS.get(beamFieldType.getTypeName());
+ if (encoder == null) {
+ throw new RuntimeException("Unexpected beam type " + beamFieldType);
+ }
+ return encoder.apply(value);
+ }
+ }
+
+ static ByteString serializeBigDecimalToNumeric(BigDecimal o) {
+ return serializeBigDecimal(o, NUMERIC_SCALE, MAX_NUMERIC_VALUE, MIN_NUMERIC_VALUE, "Numeric");
+ }
+
+ private static ByteString serializeBigDecimal(
+ BigDecimal v, int scale, BigDecimal maxValue, BigDecimal minValue, String typeName) {
+ if (v.scale() > scale) {
+ throw new IllegalArgumentException(
+ typeName + " scale cannot exceed " + scale + ": " + v.toPlainString());
+ }
+ if (v.compareTo(maxValue) > 0 || v.compareTo(minValue) < 0) {
+ throw new IllegalArgumentException(typeName + " overflow: " + v.toPlainString());
+ }
+
+ byte[] bytes = v.setScale(scale).unscaledValue().toByteArray();
+ // NUMERIC/BIGNUMERIC values are serialized as scaled integers in two's complement form in
+ // little endian
+ // order. BigInteger requires the same encoding but in big endian order, therefore we must
+ // reverse the bytes that come from the proto.
+ Bytes.reverse(bytes);
+ return ByteString.copyFrom(bytes);
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index e3380ad..4ce40b6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -25,31 +25,17 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.DescriptorProtos.DescriptorProto;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
-import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
-import com.google.protobuf.Descriptors.Descriptor;
-import com.google.protobuf.Descriptors.DescriptorValidationException;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Descriptors.FileDescriptor;
-import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.Message;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
-import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.avro.Conversions;
@@ -87,22 +73,6 @@ import org.joda.time.format.DateTimeFormatterBuilder;
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class BigQueryUtils {
-
- /**
- * Given a BigQuery TableSchema, returns a protocol-buffer Descriptor that can be used to write
- * data using the Vortex streaming API.
- */
- public static Descriptor getDescriptorFromTableSchema(TableSchema jsonSchema)
- throws DescriptorValidationException {
- DescriptorProto descriptorProto = descriptorSchemaFromTableSchema(jsonSchema);
- FileDescriptorProto fileDescriptorProto =
- FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
- FileDescriptor fileDescriptor =
- FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]);
-
- return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
- }
-
/** Options for how to convert BigQuery data to Beam data. */
@AutoValue
public abstract static class ConversionOptions implements Serializable {
@@ -254,7 +224,7 @@ public class BigQueryUtils {
// TODO: BigQuery code should not be relying on Calcite metadata fields. If so, this belongs
// in the SQL package.
- private static final Map<String, StandardSQLTypeName> BEAM_TO_BIGQUERY_LOGICAL_MAPPING =
+ static final Map<String, StandardSQLTypeName> BEAM_TO_BIGQUERY_LOGICAL_MAPPING =
ImmutableMap.<String, StandardSQLTypeName>builder()
.put(SqlTypes.DATE.getIdentifier(), StandardSQLTypeName.DATE)
.put(SqlTypes.TIME.getIdentifier(), StandardSQLTypeName.TIME)
@@ -271,7 +241,7 @@ public class BigQueryUtils {
* Get the corresponding BigQuery {@link StandardSQLTypeName} for supported Beam {@link
* FieldType}.
*/
- private static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
+ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
StandardSQLTypeName ret;
if (fieldType.getTypeName().isLogicalType()) {
ret = BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(fieldType.getLogicalType().getIdentifier());
@@ -434,193 +404,6 @@ public class BigQueryUtils {
return fromTableFieldSchema(tableSchema.getFields(), options);
}
- static DescriptorProto descriptorSchemaFromTableSchema(TableSchema tableSchema) {
- return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
- }
-
- static DescriptorProto descriptorSchemaFromTableFieldSchemas(
- Iterable<TableFieldSchema> tableFieldSchemas) {
- DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
- // Create a unique name for the descriptor ('-' characters cannot be used).
- descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
- int i = 1;
- for (TableFieldSchema fieldSchema : tableFieldSchemas) {
- fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder);
- }
- return descriptorBuilder.build();
- }
-
- static void fieldDescriptorFromTableField(
- TableFieldSchema fieldSchema, int fieldNumber, DescriptorProto.Builder descriptorBuilder) {
- FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
- fieldDescriptorBuilder = fieldDescriptorBuilder.setName(fieldSchema.getName());
- fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
- switch (fieldSchema.getType()) {
- case "STRING":
- fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_STRING);
- break;
- case "BYTES":
- fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_BYTES);
- break;
- case "INT64":
- case "INTEGER":
- fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
- break;
- case "FLOAT64":
- case "FLOAT":
- fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_FLOAT);
- break;
- case "BOOL":
- case "BOOLEAN":
- fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_BOOL);
- break;
- case "TIMESTAMP":
- case "TIME":
- case "DATETIME":
- fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
- break;
- case "DATE":
- fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT32);
- break;
- case "STRUCT":
- case "RECORD":
- DescriptorProto nested = descriptorSchemaFromTableFieldSchemas(fieldSchema.getFields());
- descriptorBuilder.addNestedType(nested);
- fieldDescriptorBuilder =
- fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
- break;
- default:
- throw new UnsupportedOperationException(
- "Converting BigQuery type " + fieldSchema.getType() + " to Beam type is unsupported");
- }
-
- Optional<Mode> fieldMode = Optional.ofNullable(fieldSchema.getMode()).map(Mode::valueOf);
- if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
- fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
- } else if (!fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent()) {
- fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
- } else {
- fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
- }
- descriptorBuilder.addField(fieldDescriptorBuilder.build());
- }
-
- /**
- * Given a BigQuery TableRow, returns a protocol-buffer message that can be used to write data
- * using the Vortex streaming API.
- */
- public static DynamicMessage messageFromTableRow(Descriptor descriptor, TableRow tableRow) {
- DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
- for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
- Object value =
- messageValueFromFieldValue(fieldDescriptor, tableRow.get(fieldDescriptor.getName()));
- if (value != null) {
- builder.setField(fieldDescriptor, value);
- }
- }
- return builder.build();
- }
-
- static Object messageValueFromFieldValue(FieldDescriptor fieldDescriptor, Object bqValue) {
- if (bqValue == null) {
- if (fieldDescriptor.isOptional()) {
- return null;
- } else {
- throw new IllegalArgumentException(
- "Received null value for non-nullable field " + fieldDescriptor.getName());
- }
- }
- return toProtoValue(fieldDescriptor, bqValue);
- }
-
- private static final Map<FieldDescriptor.Type, Function<String, Object>> JSON_PROTO_PARSERS =
- ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
- .put(FieldDescriptor.Type.INT32, Integer::valueOf)
- .put(FieldDescriptor.Type.INT64, Long::valueOf)
- .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
- .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
- .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
- .put(FieldDescriptor.Type.STRING, str -> str)
- .put(
- FieldDescriptor.Type.BYTES,
- b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
- .build();
-
- private static Object toProtoValue(FieldDescriptor fieldDescriptor, Object jsonBQValue) {
- if (jsonBQValue instanceof String) {
- Function<String, Object> mapper = JSON_PROTO_PARSERS.get(fieldDescriptor.getType());
- if (mapper != null) {
- return mapper.apply((String) jsonBQValue);
- }
- } else if (jsonBQValue instanceof Integer) {
- switch (fieldDescriptor.getJavaType()) {
- case INT:
- return Integer.valueOf((int) jsonBQValue);
- case LONG:
- return Long.valueOf((int) jsonBQValue);
- default:
- throw new RuntimeException(
- "Unexpectecd java type "
- + jsonBQValue.getClass()
- + " for field descriptor "
- + fieldDescriptor);
- }
- } else if (jsonBQValue instanceof List) {
- return ((List<Object>) jsonBQValue)
- .stream()
- .map(v -> ((Map<String, Object>) v).get("v"))
- .map(v -> toProtoValue(fieldDescriptor, v))
- .collect(toList());
- } else if (jsonBQValue instanceof AbstractMap) {
- // This will handle nested rows.
- TableRow tr = new TableRow();
- tr.putAll((AbstractMap<String, Object>) jsonBQValue);
- return messageFromTableRow(fieldDescriptor.getMessageType(), tr);
- } else {
- return toProtoValue(fieldDescriptor, jsonBQValue.toString());
- }
-
- throw new UnsupportedOperationException(
- "Converting BigQuery type '"
- + jsonBQValue.getClass()
- + "' to '"
- + fieldDescriptor
- + "' is not supported");
- }
-
- public static TableRow tableRowFromMessage(Message message) {
- TableRow tableRow = new TableRow();
- for (Map.Entry<FieldDescriptor, Object> field : message.getAllFields().entrySet()) {
- FieldDescriptor fieldDescriptor = field.getKey();
- Object fieldValue = field.getValue();
- tableRow.putIfAbsent(
- fieldDescriptor.getName(), jsonValueFromMessageValue(fieldDescriptor, fieldValue, true));
- }
- return tableRow;
- }
-
- public static Object jsonValueFromMessageValue(
- FieldDescriptor fieldDescriptor, Object fieldValue, boolean expandRepeated) {
- if (expandRepeated && fieldDescriptor.isRepeated()) {
- List<Object> valueList = (List<Object>) fieldValue;
- return valueList.stream()
- .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false))
- .collect(toList());
- }
-
- switch (fieldDescriptor.getType()) {
- case GROUP:
- case MESSAGE:
- return tableRowFromMessage((Message) fieldValue);
- case BYTES:
- return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray());
- case ENUM:
- throw new RuntimeException("Enumerations not supported");
- default:
- return fieldValue;
- }
- }
-
/** Convert a list of BigQuery {@link TableFieldSchema} to Avro {@link org.apache.avro.Schema}. */
@Experimental(Kind.SCHEMAS)
public static org.apache.avro.Schema toGenericAvroSchema(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CivilTimeEncoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CivilTimeEncoder.java
new file mode 100644
index 0000000..bed767b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CivilTimeEncoder.java
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.time.temporal.ChronoUnit;
+import org.joda.time.LocalDateTime;
+import org.joda.time.LocalTime;
+
+/**
+ * Encoder for TIME and DATETIME values, according to civil_time encoding. Copied out of the zetasql
+ * package.
+ *
+ * <p>The valid range and number of bits required by each date/time field is as the following:
+ *
+ * <table>
+ * <tr> <th> Field </th> <th> Range </th> <th> #Bits </th> </tr>
+ * <tr> <td> Year </td> <td> [1, 9999] </td> <td> 14 </td> </tr>
+ * <tr> <td> Month </td> <td> [1, 12] </td> <td> 4 </td> </tr>
+ * <tr> <td> Day </td> <td> [1, 31] </td> <td> 5 </td> </tr>
+ * <tr> <td> Hour </td> <td> [0, 23] </td> <td> 5 </td> </tr>
+ * <tr> <td> Minute </td> <td> [0, 59] </td> <td> 6 </td> </tr>
+ * <tr> <td> Second </td> <td> [0, 59]* </td> <td> 6 </td> </tr>
+ * <tr> <td> Micros </td> <td> [0, 999999] </td> <td> 20 </td> </tr>
+ * <tr> <td> Nanos </td> <td> [0, 999999999] </td> <td> 30 </td> </tr>
+ * </table>
+ *
+ * <p>* Leap second is not supported.
+ *
+ * <p>When encoding the TIME or DATETIME into a bit field, larger date/time field is on the more
+ * significant side.
+ */
+public final class CivilTimeEncoder {
+ private static final int NANO_LENGTH = 30;
+ private static final int MICRO_LENGTH = 20;
+
+ private static final int NANO_SHIFT = 0;
+ private static final int MICRO_SHIFT = 0;
+ private static final int SECOND_SHIFT = 0;
+ private static final int MINUTE_SHIFT = 6;
+ private static final int HOUR_SHIFT = 12;
+ private static final int DAY_SHIFT = 17;
+ private static final int MONTH_SHIFT = 22;
+ private static final int YEAR_SHIFT = 26;
+
+ private static final long NANO_MASK = 0x3FFFFFFFL;
+ private static final long MICRO_MASK = 0xFFFFFL;
+ private static final long SECOND_MASK = 0x3FL;
+ private static final long MINUTE_MASK = 0xFC0L;
+ private static final long HOUR_MASK = 0x1F000L;
+ private static final long DAY_MASK = 0x3E0000L;
+ private static final long MONTH_MASK = 0x3C00000L;
+ private static final long YEAR_MASK = 0xFFFC000000L;
+
+ private static final long TIME_SECONDS_MASK = 0x1FFFFL;
+ private static final long TIME_MICROS_MASK = 0x1FFFFFFFFFL;
+ private static final long TIME_NANOS_MASK = 0x7FFFFFFFFFFFL;
+ private static final long DATETIME_SECONDS_MASK = 0xFFFFFFFFFFL;
+ private static final long DATETIME_MICROS_MASK = 0xFFFFFFFFFFFFFFFL;
+
+ /**
+ * Encodes {@code time} as a 4-byte integer with seconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 3 2 1
+ * MSB 10987654321098765432109876543210 LSB
+ * | H || M || S |
+ * </pre>
+ *
+ * @see #decodePacked32TimeSeconds(int)
+ * @see #encodePacked32TimeSeconds(java.time.LocalTime)
+ */
+ @SuppressWarnings("GoodTime") // should accept a java.time.LocalTime
+ public static int encodePacked32TimeSeconds(LocalTime time) {
+ checkValidTimeSeconds(time);
+ int bitFieldTimeSeconds = 0x0;
+ bitFieldTimeSeconds |= time.getHourOfDay() << HOUR_SHIFT;
+ bitFieldTimeSeconds |= time.getMinuteOfHour() << MINUTE_SHIFT;
+ bitFieldTimeSeconds |= time.getSecondOfMinute() << SECOND_SHIFT;
+ return bitFieldTimeSeconds;
+ }
+
+ /**
+ * Encodes {@code time} as a 4-byte integer with seconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 3 2 1
+ * MSB 10987654321098765432109876543210 LSB
+ * | H || M || S |
+ * </pre>
+ *
+ * @see #decodePacked32TimeSecondsAsJavaTime(int)
+ */
+ @SuppressWarnings("GoodTime-ApiWithNumericTimeUnit")
+ public static int encodePacked32TimeSeconds(java.time.LocalTime time) {
+ checkValidTimeSeconds(time);
+ int bitFieldTimeSeconds = 0x0;
+ bitFieldTimeSeconds |= time.getHour() << HOUR_SHIFT;
+ bitFieldTimeSeconds |= time.getMinute() << MINUTE_SHIFT;
+ bitFieldTimeSeconds |= time.getSecond() << SECOND_SHIFT;
+ return bitFieldTimeSeconds;
+ }
+
+ /**
+ * Decodes {@code bitFieldTimeSeconds} as a {@link LocalTime} with seconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 3 2 1
+ * MSB 10987654321098765432109876543210 LSB
+ * | H || M || S |
+ * </pre>
+ *
+ * @see #encodePacked32TimeSeconds(LocalTime)
+ * @see #encodePacked32TimeSecondsAsJavaTime(int)
+ */
+ @SuppressWarnings("GoodTime") // should return a java.time.LocalTime
+ public static LocalTime decodePacked32TimeSeconds(int bitFieldTimeSeconds) {
+ checkValidBitField(bitFieldTimeSeconds, TIME_SECONDS_MASK);
+ int hourOfDay = getFieldFromBitField(bitFieldTimeSeconds, HOUR_MASK, HOUR_SHIFT);
+ int minuteOfHour = getFieldFromBitField(bitFieldTimeSeconds, MINUTE_MASK, MINUTE_SHIFT);
+ int secondOfMinute = getFieldFromBitField(bitFieldTimeSeconds, SECOND_MASK, SECOND_SHIFT);
+ LocalTime time = new LocalTime(hourOfDay, minuteOfHour, secondOfMinute);
+ checkValidTimeSeconds(time);
+ return time;
+ }
+
+ /**
+ * Decodes {@code bitFieldTimeSeconds} as a {@link java.time.LocalTime} with seconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 3 2 1
+ * MSB 10987654321098765432109876543210 LSB
+ * | H || M || S |
+ * </pre>
+ *
+ * @see #encodePacked32TimeSeconds(java.time.LocalTime)
+ */
+ @SuppressWarnings("GoodTime-ApiWithNumericTimeUnit")
+ public static java.time.LocalTime decodePacked32TimeSecondsAsJavaTime(int bitFieldTimeSeconds) {
+ checkValidBitField(bitFieldTimeSeconds, TIME_SECONDS_MASK);
+ int hourOfDay = getFieldFromBitField(bitFieldTimeSeconds, HOUR_MASK, HOUR_SHIFT);
+ int minuteOfHour = getFieldFromBitField(bitFieldTimeSeconds, MINUTE_MASK, MINUTE_SHIFT);
+ int secondOfMinute = getFieldFromBitField(bitFieldTimeSeconds, SECOND_MASK, SECOND_SHIFT);
+ // java.time.LocalTime validates the input parameters.
+ try {
+ return java.time.LocalTime.of(hourOfDay, minuteOfHour, secondOfMinute);
+ } catch (java.time.DateTimeException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Encodes {@code time} as a 8-byte integer with microseconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * | H || M || S ||-------micros-----|
+ * </pre>
+ *
+ * @see #decodePacked64TimeMicros(long)
+ * @see #encodePacked64TimeMicros(java.time.LocalTime)
+ */
+ @SuppressWarnings("GoodTime") // should accept a java.time.LocalTime
+ public static long encodePacked64TimeMicros(LocalTime time) {
+ checkValidTimeMillis(time);
+ return (((long) encodePacked32TimeSeconds(time)) << MICRO_LENGTH)
+ | (time.getMillisOfSecond() * 1_000L);
+ }
+
+ /**
+ * Encodes {@code time} as a 8-byte integer with microseconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * | H || M || S ||-------micros-----|
+ * </pre>
+ *
+ * @see #decodePacked64TimeMicrosAsJavaTime(long)
+ */
+ @SuppressWarnings({"GoodTime-ApiWithNumericTimeUnit", "JavaLocalTimeGetNano"})
+ public static long encodePacked64TimeMicros(java.time.LocalTime time) {
+ checkValidTimeMicros(time);
+ return (((long) encodePacked32TimeSeconds(time)) << MICRO_LENGTH) | (time.getNano() / 1_000L);
+ }
+
+ /**
+ * Decodes {@code bitFieldTimeMicros} as a {@link LocalTime} with microseconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * | H || M || S ||-------micros-----|
+ * </pre>
+ *
+ * <p><b>Warning: LocalTime only supports milliseconds precision. Result is truncated.</b>
+ *
+ * @see #encodePacked64TimeMicros(LocalTime)
+ * @see #decodePacked64TimeMicrosAsJavaTime(long)
+ */
+ @SuppressWarnings("GoodTime") // should return a java.time.LocalTime
+ public static LocalTime decodePacked64TimeMicros(long bitFieldTimeMicros) {
+ checkValidBitField(bitFieldTimeMicros, TIME_MICROS_MASK);
+ int bitFieldTimeSeconds = (int) (bitFieldTimeMicros >> MICRO_LENGTH);
+ LocalTime timeSeconds = decodePacked32TimeSeconds(bitFieldTimeSeconds);
+ int microOfSecond = getFieldFromBitField(bitFieldTimeMicros, MICRO_MASK, MICRO_SHIFT);
+ checkValidMicroOfSecond(microOfSecond);
+ LocalTime time = timeSeconds.withMillisOfSecond(microOfSecond / 1_000);
+ checkValidTimeMillis(time);
+ return time;
+ }
+
+ /**
+ * Decodes {@code bitFieldTimeMicros} as a {@link java.time.LocalTime} with microseconds
+ * precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * | H || M || S ||-------micros-----|
+ * </pre>
+ *
+ * @see #encodePacked64TimeMicros(java.time.LocalTime)
+ */
+ @SuppressWarnings("GoodTime-ApiWithNumericTimeUnit")
+ public static java.time.LocalTime decodePacked64TimeMicrosAsJavaTime(long bitFieldTimeMicros) {
+ checkValidBitField(bitFieldTimeMicros, TIME_MICROS_MASK);
+ int bitFieldTimeSeconds = (int) (bitFieldTimeMicros >> MICRO_LENGTH);
+ java.time.LocalTime timeSeconds = decodePacked32TimeSecondsAsJavaTime(bitFieldTimeSeconds);
+ int microOfSecond = getFieldFromBitField(bitFieldTimeMicros, MICRO_MASK, MICRO_SHIFT);
+ checkValidMicroOfSecond(microOfSecond);
+ return timeSeconds.withNano(microOfSecond * 1_000);
+ }
+
+ /**
+ * Encodes {@code time} as a 8-byte integer with nanoseconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * | H || M || S ||---------- nanos -----------|
+ * </pre>
+ *
+ * @see #decodePacked64TimeNanos(long)
+ * @see #encodePacked64TimeNanos(java.time.LocalTime)
+ */
+ @SuppressWarnings("GoodTime") // should accept a java.time.LocalTime
+ public static long encodePacked64TimeNanos(LocalTime time) {
+ checkValidTimeMillis(time);
+ return (((long) encodePacked32TimeSeconds(time)) << NANO_LENGTH)
+ | (time.getMillisOfSecond() * 1_000_000L);
+ }
+
+ /**
+ * Encodes {@code time} as a 8-byte integer with nanoseconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * | H || M || S ||---------- nanos -----------|
+ * </pre>
+ *
+ * @see #decodePacked64TimeNanosAsJavaTime(long)
+ */
+ @SuppressWarnings({"GoodTime-ApiWithNumericTimeUnit", "JavaLocalTimeGetNano"})
+ public static long encodePacked64TimeNanos(java.time.LocalTime time) {
+ checkValidTimeNanos(time);
+ return (((long) encodePacked32TimeSeconds(time)) << NANO_LENGTH) | time.getNano();
+ }
+
+ /**
+ * Decodes {@code bitFieldTimeNanos} as a {@link LocalTime} with nanoseconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * | H || M || S ||---------- nanos -----------|
+ * </pre>
+ *
+ * <p><b>Warning: LocalTime only supports milliseconds precision. Result is truncated.</b>
+ *
+ * @see #encodePacked64TimeNanos(LocalTime)
+ * @see #decodePacked64TimeNanosAsJavaTime(long)
+ */
+ @SuppressWarnings("GoodTime") // should return a java.time.LocalTime
+ public static LocalTime decodePacked64TimeNanos(long bitFieldTimeNanos) {
+ checkValidBitField(bitFieldTimeNanos, TIME_NANOS_MASK);
+ int bitFieldTimeSeconds = (int) (bitFieldTimeNanos >> NANO_LENGTH);
+ LocalTime timeSeconds = decodePacked32TimeSeconds(bitFieldTimeSeconds);
+ int nanoOfSecond = getFieldFromBitField(bitFieldTimeNanos, NANO_MASK, NANO_SHIFT);
+ checkValidNanoOfSecond(nanoOfSecond);
+ LocalTime time = timeSeconds.withMillisOfSecond(nanoOfSecond / 1_000_000);
+ checkValidTimeMillis(time);
+ return time;
+ }
+
+ /**
+ * Decodes {@code bitFieldTimeNanos} as a {@link java.time.LocalTime} with nanoseconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * | H || M || S ||---------- nanos -----------|
+ * </pre>
+ *
+ * @see #encodePacked64TimeNanos(java.time.LocalTime)
+ */
+ @SuppressWarnings("GoodTime-ApiWithNumericTimeUnit")
+ public static java.time.LocalTime decodePacked64TimeNanosAsJavaTime(long bitFieldTimeNanos) {
+ checkValidBitField(bitFieldTimeNanos, TIME_NANOS_MASK);
+ int bitFieldTimeSeconds = (int) (bitFieldTimeNanos >> NANO_LENGTH);
+ java.time.LocalTime timeSeconds = decodePacked32TimeSecondsAsJavaTime(bitFieldTimeSeconds);
+ int nanoOfSecond = getFieldFromBitField(bitFieldTimeNanos, NANO_MASK, NANO_SHIFT);
+ checkValidNanoOfSecond(nanoOfSecond);
+ return timeSeconds.withNano(nanoOfSecond);
+ }
+
+ /**
+ * Encodes {@code dateTime} as a 8-byte integer with seconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * |--- year ---||m || D || H || M || S |
+ * </pre>
+ *
+ * @see #decodePacked64DatetimeSeconds(long)
+ * @see #encodePacked64DatetimeSeconds(java.time.LocalDateTime)
+ */
+ @SuppressWarnings("GoodTime") // should accept a java.time.LocalDateTime
+ public static long encodePacked64DatetimeSeconds(LocalDateTime dateTime) {
+ checkValidDateTimeSeconds(dateTime);
+ long bitFieldDatetimeSeconds = 0x0L;
+ bitFieldDatetimeSeconds |= (long) dateTime.getYear() << YEAR_SHIFT;
+ bitFieldDatetimeSeconds |= (long) dateTime.getMonthOfYear() << MONTH_SHIFT;
+ bitFieldDatetimeSeconds |= (long) dateTime.getDayOfMonth() << DAY_SHIFT;
+ bitFieldDatetimeSeconds |= (long) encodePacked32TimeSeconds(dateTime.toLocalTime());
+ return bitFieldDatetimeSeconds;
+ }
+
+ /**
+ * Encodes {@code dateTime} as a 8-byte integer with seconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * |--- year ---||m || D || H || M || S |
+ * </pre>
+ *
+ * @see #decodePacked64DatetimeSecondsAsJavaTime(long)
+ */
+ @SuppressWarnings("GoodTime-ApiWithNumericTimeUnit")
+ public static long encodePacked64DatetimeSeconds(java.time.LocalDateTime dateTime) {
+ checkValidDateTimeSeconds(dateTime);
+ long bitFieldDatetimeSeconds = 0x0L;
+ bitFieldDatetimeSeconds |= (long) dateTime.getYear() << YEAR_SHIFT;
+ bitFieldDatetimeSeconds |= (long) dateTime.getMonthValue() << MONTH_SHIFT;
+ bitFieldDatetimeSeconds |= (long) dateTime.getDayOfMonth() << DAY_SHIFT;
+ bitFieldDatetimeSeconds |= (long) encodePacked32TimeSeconds(dateTime.toLocalTime());
+ return bitFieldDatetimeSeconds;
+ }
+
+ /**
+ * Decodes {@code bitFieldDatetimeSeconds} as a {@link LocalDateTime} with seconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * |--- year ---||m || D || H || M || S |
+ * </pre>
+ *
+ * @see #encodePacked64DatetimeSeconds(LocalDateTime)
+ * @see #decodePacked64DatetimeSecondsAsJavaTime(long)
+ */
+ @SuppressWarnings("GoodTime") // should return a java.time.LocalDateTime
+ public static LocalDateTime decodePacked64DatetimeSeconds(long bitFieldDatetimeSeconds) {
+ checkValidBitField(bitFieldDatetimeSeconds, DATETIME_SECONDS_MASK);
+ int bitFieldTimeSeconds = (int) (bitFieldDatetimeSeconds & TIME_SECONDS_MASK);
+ LocalTime timeSeconds = decodePacked32TimeSeconds(bitFieldTimeSeconds);
+ int year = getFieldFromBitField(bitFieldDatetimeSeconds, YEAR_MASK, YEAR_SHIFT);
+ int monthOfYear = getFieldFromBitField(bitFieldDatetimeSeconds, MONTH_MASK, MONTH_SHIFT);
+ int dayOfMonth = getFieldFromBitField(bitFieldDatetimeSeconds, DAY_MASK, DAY_SHIFT);
+ LocalDateTime dateTime =
+ new LocalDateTime(
+ year,
+ monthOfYear,
+ dayOfMonth,
+ timeSeconds.getHourOfDay(),
+ timeSeconds.getMinuteOfHour(),
+ timeSeconds.getSecondOfMinute());
+ checkValidDateTimeSeconds(dateTime);
+ return dateTime;
+ }
+
+ /**
+ * Decodes {@code bitFieldDatetimeSeconds} as a {@link LocalDateTime} with seconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * |--- year ---||m || D || H || M || S |
+ * </pre>
+ *
+ * @see #encodePacked64DatetimeSeconds(java.time.LocalDateTime)
+ */
+ @SuppressWarnings("GoodTime-ApiWithNumericTimeUnit")
+ public static java.time.LocalDateTime decodePacked64DatetimeSecondsAsJavaTime(
+ long bitFieldDatetimeSeconds) {
+ checkValidBitField(bitFieldDatetimeSeconds, DATETIME_SECONDS_MASK);
+ int bitFieldTimeSeconds = (int) (bitFieldDatetimeSeconds & TIME_SECONDS_MASK);
+ java.time.LocalTime timeSeconds = decodePacked32TimeSecondsAsJavaTime(bitFieldTimeSeconds);
+ int year = getFieldFromBitField(bitFieldDatetimeSeconds, YEAR_MASK, YEAR_SHIFT);
+ int monthOfYear = getFieldFromBitField(bitFieldDatetimeSeconds, MONTH_MASK, MONTH_SHIFT);
+ int dayOfMonth = getFieldFromBitField(bitFieldDatetimeSeconds, DAY_MASK, DAY_SHIFT);
+ try {
+ java.time.LocalDateTime dateTime =
+ java.time.LocalDateTime.of(
+ year,
+ monthOfYear,
+ dayOfMonth,
+ timeSeconds.getHour(),
+ timeSeconds.getMinute(),
+ timeSeconds.getSecond());
+ checkValidDateTimeSeconds(dateTime);
+ return dateTime;
+ } catch (java.time.DateTimeException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Encodes {@code dateTime} as a 8-byte integer with microseconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * |--- year ---||m || D || H || M || S ||-------micros-----|
+ * </pre>
+ *
+ * @see #decodePacked64DatetimeMicros(long)
+ * @see #encodePacked64DatetimeMicros(java.time.LocalDateTime)
+ */
+ @SuppressWarnings("GoodTime") // should accept a java.time.LocalDateTime
+ public static long encodePacked64DatetimeMicros(LocalDateTime dateTime) {
+ checkValidDateTimeMillis(dateTime);
+ return (encodePacked64DatetimeSeconds(dateTime) << MICRO_LENGTH)
+ | (dateTime.getMillisOfSecond() * 1_000L);
+ }
+
+ /**
+ * Encodes {@code dateTime} as a 8-byte integer with microseconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * |--- year ---||m || D || H || M || S ||-------micros-----|
+ * </pre>
+ *
+ * @see #decodePacked64DatetimeMicrosAsJavaTime(long)
+ */
+ @SuppressWarnings({"GoodTime-ApiWithNumericTimeUnit", "JavaLocalDateTimeGetNano"})
+ public static long encodePacked64DatetimeMicros(java.time.LocalDateTime dateTime) {
+ checkValidDateTimeMicros(dateTime);
+ return (encodePacked64DatetimeSeconds(dateTime) << MICRO_LENGTH)
+ | (dateTime.getNano() / 1_000L);
+ }
+
+ /**
+ * Decodes {@code bitFieldDatetimeMicros} as a {@link LocalDateTime} with microseconds precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * |--- year ---||m || D || H || M || S ||-------micros-----|
+ * </pre>
+ *
+ * <p><b>Warning: LocalDateTime only supports milliseconds precision. Result is truncated.</b>
+ *
+ * @see #encodePacked64DatetimeMicros(LocalDateTime)
+ * @see #decodePacked64DatetimeMicrosAsJavaTime(long)
+ */
+ @SuppressWarnings("GoodTime") // should return a java.time.LocalDateTime
+ public static LocalDateTime decodePacked64DatetimeMicros(long bitFieldDatetimeMicros) {
+ checkValidBitField(bitFieldDatetimeMicros, DATETIME_MICROS_MASK);
+ long bitFieldDatetimeSeconds = bitFieldDatetimeMicros >> MICRO_LENGTH;
+ LocalDateTime dateTimeSeconds = decodePacked64DatetimeSeconds(bitFieldDatetimeSeconds);
+ int microOfSecond = getFieldFromBitField(bitFieldDatetimeMicros, MICRO_MASK, MICRO_SHIFT);
+ checkValidMicroOfSecond(microOfSecond);
+ LocalDateTime dateTime = dateTimeSeconds.withMillisOfSecond(microOfSecond / 1_000);
+ checkValidDateTimeMillis(dateTime);
+ return dateTime;
+ }
+
+ /**
+ * Decodes {@code bitFieldDatetimeMicros} as a {@link java.time.LocalDateTime} with microseconds
+ * precision.
+ *
+ * <p>Encoding is as the following:
+ *
+ * <pre>
+ * 6 5 4 3 2 1
+ * MSB 3210987654321098765432109876543210987654321098765432109876543210 LSB
+ * |--- year ---||m || D || H || M || S ||-------micros-----|
+ * </pre>
+ *
+ * @see #encodePacked64DatetimeMicros(java.time.LocalDateTime)
+ */
+ @SuppressWarnings("GoodTime-ApiWithNumericTimeUnit")
+ public static java.time.LocalDateTime decodePacked64DatetimeMicrosAsJavaTime(
+ long bitFieldDatetimeMicros) {
+ checkValidBitField(bitFieldDatetimeMicros, DATETIME_MICROS_MASK);
+ long bitFieldDatetimeSeconds = bitFieldDatetimeMicros >> MICRO_LENGTH;
+ java.time.LocalDateTime dateTimeSeconds =
+ decodePacked64DatetimeSecondsAsJavaTime(bitFieldDatetimeSeconds);
+ int microOfSecond = getFieldFromBitField(bitFieldDatetimeMicros, MICRO_MASK, MICRO_SHIFT);
+ checkValidMicroOfSecond(microOfSecond);
+ java.time.LocalDateTime dateTime = dateTimeSeconds.withNano(microOfSecond * 1_000);
+ checkValidDateTimeMicros(dateTime);
+ return dateTime;
+ }
+
+ private static int getFieldFromBitField(long bitField, long mask, int shift) {
+ return (int) ((bitField & mask) >> shift);
+ }
+
+ private static void checkValidTimeSeconds(LocalTime time) {
+ checkArgument(time.getHourOfDay() >= 0 && time.getHourOfDay() <= 23);
+ checkArgument(time.getMinuteOfHour() >= 0 && time.getMinuteOfHour() <= 59);
+ checkArgument(time.getSecondOfMinute() >= 0 && time.getSecondOfMinute() <= 59);
+ }
+
+ private static void checkValidTimeSeconds(java.time.LocalTime time) {
+ checkArgument(time.getHour() >= 0 && time.getHour() <= 23);
+ checkArgument(time.getMinute() >= 0 && time.getMinute() <= 59);
+ checkArgument(time.getSecond() >= 0 && time.getSecond() <= 59);
+ }
+
+ private static void checkValidTimeMillis(LocalTime time) {
+ checkValidTimeSeconds(time);
+ checkArgument(time.getMillisOfSecond() >= 0 && time.getMillisOfSecond() <= 999);
+ }
+
+ private static void checkValidTimeMicros(java.time.LocalTime time) {
+ checkValidTimeSeconds(time);
+ checkArgument(time.equals(time.truncatedTo(ChronoUnit.MICROS)));
+ }
+
+ private static void checkValidTimeNanos(java.time.LocalTime time) {
+ checkValidTimeSeconds(time);
+ }
+
+ private static void checkValidDateTimeSeconds(LocalDateTime dateTime) {
+ checkArgument(dateTime.getYear() >= 1 && dateTime.getYear() <= 9999);
+ checkArgument(dateTime.getMonthOfYear() >= 1 && dateTime.getMonthOfYear() <= 12);
+ checkArgument(dateTime.getDayOfMonth() >= 1 && dateTime.getDayOfMonth() <= 31);
+ checkValidTimeSeconds(dateTime.toLocalTime());
+ }
+
+ private static void checkValidDateTimeSeconds(java.time.LocalDateTime dateTime) {
+ checkArgument(dateTime.getYear() >= 1 && dateTime.getYear() <= 9999);
+ checkArgument(dateTime.getMonthValue() >= 1 && dateTime.getMonthValue() <= 12);
+ checkArgument(dateTime.getDayOfMonth() >= 1 && dateTime.getDayOfMonth() <= 31);
+ checkValidTimeSeconds(dateTime.toLocalTime());
+ }
+
+ private static void checkValidDateTimeMillis(LocalDateTime dateTime) {
+ checkValidDateTimeSeconds(dateTime);
+ checkArgument(dateTime.getMillisOfSecond() >= 0 && dateTime.getMillisOfSecond() <= 999);
+ }
+
+ private static void checkValidDateTimeMicros(java.time.LocalDateTime dateTime) {
+ checkValidDateTimeSeconds(dateTime);
+ checkArgument(dateTime.equals(dateTime.truncatedTo(ChronoUnit.MICROS)));
+ }
+
+ private static void checkValidDateTimeNanos(java.time.LocalDateTime dateTime) {
+ checkValidDateTimeSeconds(dateTime);
+ }
+
+ private static void checkValidMicroOfSecond(int microOfSecond) {
+ checkArgument(microOfSecond >= 0 && microOfSecond <= 999999);
+ }
+
+ private static void checkValidNanoOfSecond(int nanoOfSecond) {
+ checkArgument(nanoOfSecond >= 0 && nanoOfSecond <= 999999999);
+ }
+
+ private static void checkValidBitField(long bitField, long mask) {
+ checkArgument((bitField & ~mask) == 0x0L);
+ }
+
+ private CivilTimeEncoder() {}
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
new file mode 100644
index 0000000..873832f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+
+/**
+ * Utility methods for converting JSON {@link TableRow} objects to dynamic protocol message, for use
+ * with the Storage write API.
+ */
+public class TableRowToStorageApiProto {
+ static final Map<String, Type> PRIMITIVE_TYPES =
+ ImmutableMap.<String, Type>builder()
+ .put("INT64", Type.TYPE_INT64)
+ .put("INTEGER", Type.TYPE_INT64)
+ .put("FLOAT64", Type.TYPE_DOUBLE)
+ .put("FLOAT", Type.TYPE_DOUBLE)
+ .put("STRING", Type.TYPE_STRING)
+ .put("BOOL", Type.TYPE_BOOL)
+ .put("BOOLEAN", Type.TYPE_BOOL)
+ .put("BYTES", Type.TYPE_BYTES)
+ .put("NUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding.
+ .put("BIGNUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding.
+ .put("GEOGRAPHY", Type.TYPE_STRING) // Pass through the JSON encoding.
+ .put("DATE", Type.TYPE_STRING) // Pass through the JSON encoding.
+ .put("TIME", Type.TYPE_STRING) // Pass through the JSON encoding.
+ .put("DATETIME", Type.TYPE_STRING) // Pass through the JSON encoding.
+ .put("TIMESTAMP", Type.TYPE_STRING) // Pass through the JSON encoding.
+ .build();
+
+ /**
+ * Given a BigQuery TableSchema, returns a protocol-buffer Descriptor that can be used to write
+ * data using the BigQuery Storage API.
+ */
+ public static Descriptor getDescriptorFromTableSchema(TableSchema jsonSchema)
+ throws DescriptorValidationException {
+ DescriptorProto descriptorProto = descriptorSchemaFromTableSchema(jsonSchema);
+ FileDescriptorProto fileDescriptorProto =
+ FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
+ FileDescriptor fileDescriptor =
+ FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]);
+
+ return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
+ }
+
+ /**
+ * Given a BigQuery TableRow, returns a protocol-buffer message that can be used to write data
+ * using the BigQuery Storage API.
+ */
+ public static DynamicMessage messageFromTableRow(Descriptor descriptor, TableRow tableRow) {
+ DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+ for (Map.Entry<String, Object> entry : tableRow.entrySet()) {
+ FieldDescriptor fieldDescriptor = descriptor.findFieldByName(entry.getKey().toLowerCase());
+ Object value = messageValueFromFieldValue(fieldDescriptor, entry.getValue());
+ if (value != null) {
+ builder.setField(fieldDescriptor, value);
+ }
+ }
+ return builder.build();
+ }
+
+ @VisibleForTesting
+ static DescriptorProto descriptorSchemaFromTableSchema(TableSchema tableSchema) {
+ return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
+ }
+
+ private static DescriptorProto descriptorSchemaFromTableFieldSchemas(
+ Iterable<TableFieldSchema> tableFieldSchemas) {
+ DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+ // Create a unique name for the descriptor ('-' characters cannot be used).
+ descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
+ int i = 1;
+ for (TableFieldSchema fieldSchema : tableFieldSchemas) {
+ fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder);
+ }
+ return descriptorBuilder.build();
+ }
+
+ private static void fieldDescriptorFromTableField(
+ TableFieldSchema fieldSchema, int fieldNumber, DescriptorProto.Builder descriptorBuilder) {
+ FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setName(fieldSchema.getName().toLowerCase());
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+ switch (fieldSchema.getType()) {
+ case "STRUCT":
+ case "RECORD":
+ DescriptorProto nested = descriptorSchemaFromTableFieldSchemas(fieldSchema.getFields());
+ descriptorBuilder.addNestedType(nested);
+ fieldDescriptorBuilder =
+ fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+ break;
+ default:
+ @Nullable Type type = PRIMITIVE_TYPES.get(fieldSchema.getType());
+ if (type == null) {
+ throw new UnsupportedOperationException(
+ "Converting BigQuery type " + fieldSchema.getType() + " to Beam type is unsupported");
+ }
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setType(type);
+ }
+
+ Optional<Mode> fieldMode = Optional.ofNullable(fieldSchema.getMode()).map(Mode::valueOf);
+ if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
+ } else if (!fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent()) {
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
+ } else {
+ fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+ }
+ descriptorBuilder.addField(fieldDescriptorBuilder.build());
+ }
+
+ @Nullable
+ private static Object messageValueFromFieldValue(
+ FieldDescriptor fieldDescriptor, Object bqValue) {
+ if (bqValue == null) {
+ if (fieldDescriptor.isOptional()) {
+ return null;
+ } else {
+ throw new IllegalArgumentException(
+ "Received null value for non-nullable field " + fieldDescriptor.getName());
+ }
+ }
+ return toProtoValue(fieldDescriptor, bqValue, fieldDescriptor.isRepeated());
+ }
+
+ private static final Map<FieldDescriptor.Type, Function<String, Object>>
+ JSON_PROTO_STRING_PARSERS =
+ ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
+ .put(FieldDescriptor.Type.INT32, Integer::valueOf)
+ .put(FieldDescriptor.Type.INT64, Long::valueOf)
+ .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
+ .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
+ .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
+ .put(FieldDescriptor.Type.STRING, str -> str)
+ .put(
+ FieldDescriptor.Type.BYTES,
+ b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
+ .build();
+
+ @Nullable
+ @SuppressWarnings({"nullness"})
+ @VisibleForTesting
+ static Object toProtoValue(
+ FieldDescriptor fieldDescriptor, Object jsonBQValue, boolean isRepeated) {
+ if (isRepeated) {
+ return ((List<Object>) jsonBQValue)
+ .stream()
+ .map(
+ v -> {
+ if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
+ return ((Map<String, Object>) v).get("v");
+ } else {
+ return v;
+ }
+ })
+ .map(v -> toProtoValue(fieldDescriptor, v, false))
+ .collect(toList());
+ }
+
+ if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
+ if (jsonBQValue instanceof AbstractMap) {
+ // This will handle nested rows.
+ TableRow tr = new TableRow();
+ tr.putAll((AbstractMap<String, Object>) jsonBQValue);
+ return messageFromTableRow(fieldDescriptor.getMessageType(), tr);
+ } else {
+ throw new RuntimeException("Unexpected value " + jsonBQValue + " Expected a JSON map.");
+ }
+ }
+ @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor, jsonBQValue);
+ if (scalarValue == null) {
+ return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated);
+ } else {
+ return scalarValue;
+ }
+ }
+
+ @VisibleForTesting
+ @Nullable
+ static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object jsonBQValue) {
+ if (jsonBQValue instanceof String) {
+ Function<String, Object> mapper = JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
+ if (mapper == null) {
+ throw new UnsupportedOperationException(
+ "Converting BigQuery type '"
+ + jsonBQValue.getClass()
+ + "' to '"
+ + fieldDescriptor
+ + "' is not supported");
+ }
+ return mapper.apply((String) jsonBQValue);
+ }
+
+ switch (fieldDescriptor.getType()) {
+ case BOOL:
+ if (jsonBQValue instanceof Boolean) {
+ return jsonBQValue;
+ }
+ break;
+ case BYTES:
+ break;
+ case INT64:
+ if (jsonBQValue instanceof Integer) {
+ return Long.valueOf((Integer) jsonBQValue);
+ } else if (jsonBQValue instanceof Long) {
+ return jsonBQValue;
+ }
+ break;
+ case INT32:
+ if (jsonBQValue instanceof Integer) {
+ return jsonBQValue;
+ }
+ break;
+ case STRING:
+ break;
+ case DOUBLE:
+ if (jsonBQValue instanceof Double) {
+ return jsonBQValue;
+ } else if (jsonBQValue instanceof Float) {
+ return Double.valueOf((Float) jsonBQValue);
+ }
+ break;
+ default:
+ throw new RuntimeException("Unsupported proto type " + fieldDescriptor.getType());
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ public static TableRow tableRowFromMessage(Message message) {
+ TableRow tableRow = new TableRow();
+ for (Map.Entry<FieldDescriptor, Object> field : message.getAllFields().entrySet()) {
+ FieldDescriptor fieldDescriptor = field.getKey();
+ Object fieldValue = field.getValue();
+ tableRow.putIfAbsent(
+ fieldDescriptor.getName(), jsonValueFromMessageValue(fieldDescriptor, fieldValue, true));
+ }
+ return tableRow;
+ }
+
+ public static Object jsonValueFromMessageValue(
+ FieldDescriptor fieldDescriptor, Object fieldValue, boolean expandRepeated) {
+ if (expandRepeated && fieldDescriptor.isRepeated()) {
+ List<Object> valueList = (List<Object>) fieldValue;
+ return valueList.stream()
+ .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false))
+ .collect(toList());
+ }
+
+ switch (fieldDescriptor.getType()) {
+ case GROUP:
+ case MESSAGE:
+ return tableRowFromMessage((Message) fieldValue);
+ case BYTES:
+ return BaseEncoding.base64().encode(((ByteString) fieldValue).toByteArray());
+ case ENUM:
+ throw new RuntimeException("Enumerations not supported");
+ default:
+ return fieldValue;
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
new file mode 100644
index 0000000..32c0b7a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests form {@link BeamRowToStorageApiProto}. */
+public class BeamRowToStorageApiProtoTest {
+ private static final EnumerationType TEST_ENUM =
+ EnumerationType.create("ONE", "TWO", "RED", "BLUE");
+ private static final Schema BASE_SCHEMA =
+ Schema.builder()
+ .addField("byteValue", FieldType.BYTE.withNullable(true))
+ .addField("int16Value", FieldType.INT16)
+ .addField("int32Value", FieldType.INT32.withNullable(true))
+ .addField("int64Value", FieldType.INT64.withNullable(true))
+ .addField("decimalValue", FieldType.DECIMAL.withNullable(true))
+ .addField("floatValue", FieldType.FLOAT.withNullable(true))
+ .addField("doubleValue", FieldType.DOUBLE.withNullable(true))
+ .addField("stringValue", FieldType.STRING.withNullable(true))
+ .addField("datetimeValue", FieldType.DATETIME.withNullable(true))
+ .addField("booleanValue", FieldType.BOOLEAN.withNullable(true))
+ .addField("bytesValue", FieldType.BYTES.withNullable(true))
+ .addField("arrayValue", FieldType.array(FieldType.STRING))
+ .addField("iterableValue", FieldType.array(FieldType.STRING))
+ .addField("sqlDateValue", FieldType.logicalType(SqlTypes.DATE).withNullable(true))
+ .addField("sqlTimeValue", FieldType.logicalType(SqlTypes.TIME).withNullable(true))
+ .addField("sqlDatetimeValue", FieldType.logicalType(SqlTypes.DATETIME).withNullable(true))
+ .addField(
+ "sqlTimestampValue", FieldType.logicalType(SqlTypes.TIMESTAMP).withNullable(true))
+ .addField("enumValue", FieldType.logicalType(TEST_ENUM).withNullable(true))
+ .build();
+
+ private static final DescriptorProto BASE_SCHEMA_PROTO =
+ DescriptorProto.newBuilder()
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("bytevalue")
+ .setNumber(1)
+ .setType(Type.TYPE_INT32)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("int16value")
+ .setNumber(2)
+ .setType(Type.TYPE_INT32)
+ .setLabel(Label.LABEL_REQUIRED)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("int32value")
+ .setNumber(3)
+ .setType(Type.TYPE_INT32)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("int64value")
+ .setNumber(4)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("decimalvalue")
+ .setNumber(5)
+ .setType(Type.TYPE_BYTES)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("floatvalue")
+ .setNumber(6)
+ .setType(Type.TYPE_FLOAT)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("doublevalue")
+ .setNumber(7)
+ .setType(Type.TYPE_DOUBLE)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("stringvalue")
+ .setNumber(8)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("datetimevalue")
+ .setNumber(9)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("booleanvalue")
+ .setNumber(10)
+ .setType(Type.TYPE_BOOL)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("bytesvalue")
+ .setNumber(11)
+ .setType(Type.TYPE_BYTES)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("arrayvalue")
+ .setNumber(12)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_REPEATED)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("iterablevalue")
+ .setNumber(13)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_REPEATED)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("sqldatevalue")
+ .setNumber(14)
+ .setType(Type.TYPE_INT32)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("sqltimevalue")
+ .setNumber(15)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("sqldatetimevalue")
+ .setNumber(16)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("sqltimestampvalue")
+ .setNumber(17)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("enumvalue")
+ .setNumber(18)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .build();
+
+ private static final byte[] BYTES = "BYTE BYTE BYTE".getBytes(StandardCharsets.UTF_8);
+ private static final Row BASE_ROW =
+ Row.withSchema(BASE_SCHEMA)
+ .withFieldValue("byteValue", (byte) 1)
+ .withFieldValue("int16Value", (short) 2)
+ .withFieldValue("int32Value", (int) 3)
+ .withFieldValue("int64Value", (long) 4)
+ .withFieldValue("decimalValue", BigDecimal.valueOf(5))
+ .withFieldValue("floatValue", (float) 3.14)
+ .withFieldValue("doubleValue", (double) 2.68)
+ .withFieldValue("stringValue", "I am a string. Hear me roar.")
+ .withFieldValue("datetimeValue", Instant.now())
+ .withFieldValue("booleanValue", true)
+ .withFieldValue("bytesValue", BYTES)
+ .withFieldValue("arrayValue", ImmutableList.of("one", "two", "red", "blue"))
+ .withFieldValue("iterableValue", ImmutableList.of("blue", "red", "two", "one"))
+ .withFieldValue("sqlDateValue", LocalDate.now())
+ .withFieldValue("sqlTimeValue", LocalTime.now())
+ .withFieldValue("sqlDatetimeValue", LocalDateTime.now())
+ .withFieldValue("sqlTimestampValue", java.time.Instant.now())
+ .withFieldValue("enumValue", TEST_ENUM.valueOf("RED"))
+ .build();
+ private static final Map<String, Object> BASE_PROTO_EXPECTED_FIELDS =
+ ImmutableMap.<String, Object>builder()
+ .put("bytevalue", (int) 1)
+ .put("int16value", (int) 2)
+ .put("int32value", (int) 3)
+ .put("int64value", (long) 4)
+ .put(
+ "decimalvalue",
+ BeamRowToStorageApiProto.serializeBigDecimalToNumeric(BigDecimal.valueOf(5)))
+ .put("floatvalue", (float) 3.14)
+ .put("doublevalue", (double) 2.68)
+ .put("stringvalue", "I am a string. Hear me roar.")
+ .put("datetimevalue", BASE_ROW.getDateTime("datetimeValue").getMillis() * 1000)
+ .put("booleanvalue", true)
+ .put("bytesvalue", ByteString.copyFrom(BYTES))
+ .put("arrayvalue", ImmutableList.of("one", "two", "red", "blue"))
+ .put("iterablevalue", ImmutableList.of("blue", "red", "two", "one"))
+ .put(
+ "sqldatevalue",
+ (int) BASE_ROW.getLogicalTypeValue("sqlDateValue", LocalDate.class).toEpochDay())
+ .put(
+ "sqltimevalue",
+ CivilTimeEncoder.encodePacked64TimeMicros(
+ BASE_ROW.getLogicalTypeValue("sqlTimeValue", LocalTime.class)))
+ .put(
+ "sqldatetimevalue",
+ CivilTimeEncoder.encodePacked64DatetimeSeconds(
+ BASE_ROW.getLogicalTypeValue("sqlDatetimeValue", LocalDateTime.class)))
+ .put(
+ "sqltimestampvalue",
+ BASE_ROW
+ .getLogicalTypeValue("sqlTimestampValue", java.time.Instant.class)
+ .toEpochMilli()
+ * 1000)
+ .put("enumvalue", "RED")
+ .build();
+
+ private static final Schema NESTED_SCHEMA =
+ Schema.builder()
+ .addField("nested", FieldType.row(BASE_SCHEMA).withNullable(true))
+ .addField("nestedArray", FieldType.array(FieldType.row(BASE_SCHEMA)))
+ .addField("nestedIterable", FieldType.iterable(FieldType.row(BASE_SCHEMA)))
+ .build();
+ private static final Row NESTED_ROW =
+ Row.withSchema(NESTED_SCHEMA)
+ .withFieldValue("nested", BASE_ROW)
+ .withFieldValue("nestedArray", ImmutableList.of(BASE_ROW, BASE_ROW))
+ .withFieldValue("nestedIterable", ImmutableList.of(BASE_ROW, BASE_ROW))
+ .build();
+
+ @Test
+ public void testDescriptorFromSchema() {
+ DescriptorProto descriptor =
+ BeamRowToStorageApiProto.descriptorSchemaFromBeamSchema(BASE_SCHEMA);
+ Map<String, Type> types =
+ descriptor.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ Map<String, Type> expectedTypes =
+ BASE_SCHEMA_PROTO.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ assertEquals(expectedTypes, types);
+
+ Map<String, String> nameMapping =
+ BASE_SCHEMA.getFields().stream()
+ .collect(Collectors.toMap(f -> f.getName().toLowerCase(), Field::getName));
+ descriptor
+ .getFieldList()
+ .forEach(
+ p -> {
+ FieldType schemaFieldType =
+ BASE_SCHEMA.getField(nameMapping.get(p.getName())).getType();
+ Label label =
+ schemaFieldType.getTypeName().isCollectionType()
+ ? Label.LABEL_REPEATED
+ : schemaFieldType.getNullable() ? Label.LABEL_OPTIONAL : Label.LABEL_REQUIRED;
+ assertEquals(label, p.getLabel());
+ });
+ }
+
+ @Test
+ public void testNestedFromSchema() {
+ DescriptorProto descriptor =
+ BeamRowToStorageApiProto.descriptorSchemaFromBeamSchema(NESTED_SCHEMA);
+ Map<String, Type> expectedBaseTypes =
+ BASE_SCHEMA_PROTO.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+
+ Map<String, Type> types =
+ descriptor.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ Map<String, String> typeNames =
+ descriptor.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getTypeName));
+ Map<String, Label> typeLabels =
+ descriptor.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getLabel));
+
+ assertEquals(3, types.size());
+
+ Map<String, DescriptorProto> nestedTypes =
+ descriptor.getNestedTypeList().stream()
+ .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity()));
+ assertEquals(3, nestedTypes.size());
+ assertEquals(Type.TYPE_MESSAGE, types.get("nested"));
+ assertEquals(Label.LABEL_OPTIONAL, typeLabels.get("nested"));
+ String nestedTypeName1 = typeNames.get("nested");
+ Map<String, Type> nestedTypes1 =
+ nestedTypes.get(nestedTypeName1).getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ assertEquals(expectedBaseTypes, nestedTypes1);
+
+ assertEquals(Type.TYPE_MESSAGE, types.get("nestedarray"));
+ assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestedarray"));
+ String nestedTypeName2 = typeNames.get("nestedarray");
+ Map<String, Type> nestedTypes2 =
+ nestedTypes.get(nestedTypeName2).getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ assertEquals(expectedBaseTypes, nestedTypes2);
+
+ assertEquals(Type.TYPE_MESSAGE, types.get("nestediterable"));
+ assertEquals(Label.LABEL_REPEATED, typeLabels.get("nestediterable"));
+ String nestedTypeName3 = typeNames.get("nestediterable");
+ Map<String, Type> nestedTypes3 =
+ nestedTypes.get(nestedTypeName3).getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ assertEquals(expectedBaseTypes, nestedTypes3);
+ }
+
+ private void assertBaseRecord(DynamicMessage msg) {
+ Map<String, Object> recordFields =
+ msg.getAllFields().entrySet().stream()
+ .collect(
+ Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue()));
+ assertEquals(BASE_PROTO_EXPECTED_FIELDS, recordFields);
+ }
+
+ @Test
+ public void testMessageFromTableRow() throws Exception {
+ Descriptor descriptor = BeamRowToStorageApiProto.getDescriptorFromSchema(NESTED_SCHEMA);
+ DynamicMessage msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, NESTED_ROW);
+ assertEquals(3, msg.getAllFields().size());
+
+ Map<String, FieldDescriptor> fieldDescriptors =
+ descriptor.getFields().stream()
+ .collect(Collectors.toMap(FieldDescriptor::getName, Functions.identity()));
+ DynamicMessage nestedMsg = (DynamicMessage) msg.getField(fieldDescriptors.get("nested"));
+ assertBaseRecord(nestedMsg);
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index d0d4c31..1c856cc 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -31,14 +31,6 @@ import static org.junit.Assert.assertThrows;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.DescriptorProtos.DescriptorProto;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
-import com.google.protobuf.Descriptors.Descriptor;
-import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.DynamicMessage;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
@@ -46,13 +38,10 @@ import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
-import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericData;
@@ -64,10 +53,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.chrono.ISOChronology;
@@ -787,250 +773,4 @@ public class BigQueryUtilsTest {
record, AVRO_ARRAY_ARRAY_TYPE, BigQueryUtils.ConversionOptions.builder().build());
assertEquals(expected, beamRow);
}
-
- private static final TableSchema BASE_TABLE_SCHEMA =
- new TableSchema()
- .setFields(
- ImmutableList.<TableFieldSchema>builder()
- .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
- .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
- .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
- .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
- .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
- .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
- .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
- .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
- .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
- .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
- .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
- .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
- .build());
-
- private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
- DescriptorProto.newBuilder()
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("stringValue")
- .setNumber(1)
- .setType(Type.TYPE_STRING)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("bytesValue")
- .setNumber(2)
- .setType(Type.TYPE_BYTES)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("int64Value")
- .setNumber(3)
- .setType(Type.TYPE_INT64)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("intValue")
- .setNumber(4)
- .setType(Type.TYPE_INT64)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("float64Value")
- .setNumber(5)
- .setType(Type.TYPE_FLOAT)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("floatValue")
- .setNumber(6)
- .setType(Type.TYPE_FLOAT)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("boolValue")
- .setNumber(7)
- .setType(Type.TYPE_BOOL)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("booleanValue")
- .setNumber(8)
- .setType(Type.TYPE_BOOL)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("timestampValue")
- .setNumber(9)
- .setType(Type.TYPE_INT64)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("timeValue")
- .setNumber(10)
- .setType(Type.TYPE_INT64)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("datetimeValue")
- .setNumber(11)
- .setType(Type.TYPE_INT64)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .addField(
- FieldDescriptorProto.newBuilder()
- .setName("dateValue")
- .setNumber(12)
- .setType(Type.TYPE_INT32)
- .setLabel(Label.LABEL_OPTIONAL)
- .build())
- .build();
-
- private static final TableSchema NESTED_TABLE_SCHEMA =
- new TableSchema()
- .setFields(
- ImmutableList.<TableFieldSchema>builder()
- .add(
- new TableFieldSchema()
- .setType("STRUCT")
- .setName("nestedValue1")
- .setFields(BASE_TABLE_SCHEMA.getFields()))
- .add(
- new TableFieldSchema()
- .setType("RECORD")
- .setName("nestedValue2")
- .setFields(BASE_TABLE_SCHEMA.getFields()))
- .build());
-
- // For now, test that no exceptions are thrown.
- @Test
- public void testDescriptorFromTableSchema() {
- DescriptorProto descriptor = BigQueryUtils.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
- Map<String, Type> types =
- descriptor.getFieldList().stream()
- .collect(
- Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
- Map<String, Type> expectedTypes =
- BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
- .collect(
- Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
- assertEquals(expectedTypes, types);
- }
-
- @Test
- public void testNestedFromTableSchema() {
- DescriptorProto descriptor = BigQueryUtils.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA);
- Map<String, Type> expectedBaseTypes =
- BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
- .collect(
- Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
-
- Map<String, Type> types =
- descriptor.getFieldList().stream()
- .collect(
- Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
- Map<String, String> typeNames =
- descriptor.getFieldList().stream()
- .collect(
- Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getTypeName));
- assertEquals(2, types.size());
-
- Map<String, DescriptorProto> nestedTypes =
- descriptor.getNestedTypeList().stream()
- .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity()));
- assertEquals(2, nestedTypes.size());
- assertEquals(Type.TYPE_MESSAGE, types.get("nestedValue1"));
- String nestedTypeName1 = typeNames.get("nestedValue1");
- Map<String, Type> nestedTypes1 =
- nestedTypes.get(nestedTypeName1).getFieldList().stream()
- .collect(
- Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
- assertEquals(expectedBaseTypes, nestedTypes1);
-
- assertEquals(Type.TYPE_MESSAGE, types.get("nestedValue2"));
- String nestedTypeName2 = typeNames.get("nestedValue2");
- Map<String, Type> nestedTypes2 =
- nestedTypes.get(nestedTypeName2).getFieldList().stream()
- .collect(
- Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
- assertEquals(expectedBaseTypes, nestedTypes2);
- }
-
- @Test
- public void testRepeatedDescriptorFromTableSchema() {
- BigQueryUtils.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
- }
-
- private TableRow getBaseRecord() {
- return new TableRow()
- .set("stringValue", "string")
- .set("bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
- .set("int64Value", 42L)
- .set("intValue", 43L)
- .set("float64Value", (float) 2.8168)
- .set("floatValue", (float) 2.817)
- .set("boolValue", true)
- .set("booleanValue", true)
- .set("timestampValue", 1L)
- .set("timeValue", 2L)
- .set("datetimeValue", 3L)
- .set("dateValue", 4);
- }
-
- @Test
- public void testMessageFromTableRow() throws Exception {
- TableRow baseRecord = getBaseRecord();
- Map<String, Object> baseRecordFields = ImmutableMap.copyOf(baseRecord);
-
- TableRow tableRow =
- new TableRow().set("nestedValue1", baseRecord).set("nestedValue2", baseRecord);
- Descriptor descriptor = BigQueryUtils.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA);
- DynamicMessage msg = BigQueryUtils.messageFromTableRow(descriptor, tableRow);
- assertEquals(2, msg.getAllFields().size());
-
- Map<String, FieldDescriptor> fieldDescriptors =
- descriptor.getFields().stream()
- .collect(Collectors.toMap(FieldDescriptor::getName, Functions.identity()));
- DynamicMessage nestedMsg1 = (DynamicMessage) msg.getField(fieldDescriptors.get("nestedValue1"));
- Map<String, Object> nestedMsg1Fields =
- nestedMsg1.getAllFields().entrySet().stream()
- .map(
- entry -> {
- if (entry.getKey().getType() == FieldDescriptor.Type.BYTES) {
- ByteString byteString = (ByteString) entry.getValue();
- return new AbstractMap.SimpleEntry<>(
- entry.getKey(), BaseEncoding.base64().encode(byteString.toByteArray()));
- } else {
- return entry;
- }
- })
- .collect(
- Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue()));
- assertEquals(baseRecordFields, nestedMsg1Fields);
-
- DynamicMessage nestedMsg2 = (DynamicMessage) msg.getField(fieldDescriptors.get("nestedValue2"));
- Map<String, Object> nestedMsg2Fields =
- nestedMsg2.getAllFields().entrySet().stream()
- .map(
- entry -> {
- if (entry.getKey().getType() == FieldDescriptor.Type.BYTES) {
- ByteString byteString = (ByteString) entry.getValue();
- return new AbstractMap.SimpleEntry<>(
- entry.getKey(), BaseEncoding.base64().encode(byteString.toByteArray()));
- } else {
- return entry;
- }
- })
- .collect(
- Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue()));
- assertEquals(baseRecordFields, nestedMsg2Fields);
- }
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
new file mode 100644
index 0000000..0acc41f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+ private static final TableSchema BASE_TABLE_SCHEMA =
+ new TableSchema()
+ .setFields(
+ ImmutableList.<TableFieldSchema>builder()
+ .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+ .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+ .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+ .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+ .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+ .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+ .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+ .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+ .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+ .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+ .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+ .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+ .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+ .add(
+ new TableFieldSchema()
+ .setType("STRING")
+ .setMode("REPEATED")
+ .setName("arrayValue"))
+ .build());
+
+ private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+ DescriptorProto.newBuilder()
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("stringvalue")
+ .setNumber(1)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("bytesvalue")
+ .setNumber(2)
+ .setType(Type.TYPE_BYTES)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("int64value")
+ .setNumber(3)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("intvalue")
+ .setNumber(4)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("float64value")
+ .setNumber(5)
+ .setType(Type.TYPE_DOUBLE)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("floatvalue")
+ .setNumber(6)
+ .setType(Type.TYPE_DOUBLE)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("boolvalue")
+ .setNumber(7)
+ .setType(Type.TYPE_BOOL)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("booleanvalue")
+ .setNumber(8)
+ .setType(Type.TYPE_BOOL)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("timestampvalue")
+ .setNumber(9)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("timevalue")
+ .setNumber(10)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("datetimevalue")
+ .setNumber(11)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("datevalue")
+ .setNumber(12)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("numericvalue")
+ .setNumber(13)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("arrayvalue")
+ .setNumber(14)
+ .setType(Type.TYPE_STRING)
+ .setLabel(Label.LABEL_REPEATED)
+ .build())
+ .build();
+
+ private static final TableSchema NESTED_TABLE_SCHEMA =
+ new TableSchema()
+ .setFields(
+ ImmutableList.<TableFieldSchema>builder()
+ .add(
+ new TableFieldSchema()
+ .setType("STRUCT")
+ .setName("nestedValue1")
+ .setFields(BASE_TABLE_SCHEMA.getFields()))
+ .add(
+ new TableFieldSchema()
+ .setType("RECORD")
+ .setName("nestedValue2")
+ .setFields(BASE_TABLE_SCHEMA.getFields()))
+ .build());
+
+ // For now, test that no exceptions are thrown.
+ @Test
+ public void testDescriptorFromTableSchema() {
+ DescriptorProto descriptor =
+ TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+ Map<String, Type> types =
+ descriptor.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ Map<String, Type> expectedTypes =
+ BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ assertEquals(expectedTypes, types);
+ }
+
+ @Test
+ public void testNestedFromTableSchema() {
+ DescriptorProto descriptor =
+ TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA);
+ Map<String, Type> expectedBaseTypes =
+ BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+
+ Map<String, Type> types =
+ descriptor.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ Map<String, String> typeNames =
+ descriptor.getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getTypeName));
+ assertEquals(2, types.size());
+
+ Map<String, DescriptorProto> nestedTypes =
+ descriptor.getNestedTypeList().stream()
+ .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity()));
+ assertEquals(2, nestedTypes.size());
+ assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue1"));
+ String nestedTypeName1 = typeNames.get("nestedvalue1");
+ Map<String, Type> nestedTypes1 =
+ nestedTypes.get(nestedTypeName1).getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ assertEquals(expectedBaseTypes, nestedTypes1);
+
+ assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue2"));
+ String nestedTypeName2 = typeNames.get("nestedvalue2");
+ Map<String, Type> nestedTypes2 =
+ nestedTypes.get(nestedTypeName2).getFieldList().stream()
+ .collect(
+ Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+ assertEquals(expectedBaseTypes, nestedTypes2);
+ }
+
+ @Test
+ public void testRepeatedDescriptorFromTableSchema() {
+ TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+ }
+
+ private static final TableRow BASE_TABLE_ROW =
+ new TableRow()
+ .set("stringValue", "string")
+ .set(
+ "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+ .set("int64Value", "42")
+ .set("intValue", "43")
+ .set("float64Value", "2.8168")
+ .set("floatValue", "2.817")
+ .set("boolValue", "true")
+ .set("booleanValue", "true")
+ .set("timestampValue", "43")
+ .set("timeValue", "00:52:07[.123]|[.123456] UTC")
+ .set("datetimeValue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
+ .set("dateValue", "2019-08-16")
+ .set("numericValue", "23.4")
+ .set("arrayValue", ImmutableList.of("hello", "goodbye"));
+
+ private static final Map<String, Object> BASE_ROW_EXPECTED_PROTO_VALUES =
+ ImmutableMap.<String, Object>builder()
+ .put("stringvalue", "string")
+ .put("bytesvalue", ByteString.copyFrom("string".getBytes(StandardCharsets.UTF_8)))
+ .put("int64value", (long) 42)
+ .put("intvalue", (long) 43)
+ .put("float64value", (double) 2.8168)
+ .put("floatvalue", (double) 2.817)
+ .put("boolvalue", true)
+ .put("booleanvalue", true)
+ .put("timestampvalue", "43")
+ .put("timevalue", "00:52:07[.123]|[.123456] UTC")
+ .put("datetimevalue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
+ .put("datevalue", "2019-08-16")
+ .put("numericvalue", "23.4")
+ .put("arrayvalue", ImmutableList.of("hello", "goodbye"))
+ .build();
+
+ private void assertBaseRecord(DynamicMessage msg) {
+ Map<String, Object> recordFields =
+ msg.getAllFields().entrySet().stream()
+ .collect(
+ Collectors.toMap(entry -> entry.getKey().getName(), entry -> entry.getValue()));
+ assertEquals(BASE_ROW_EXPECTED_PROTO_VALUES, recordFields);
+ }
+
+ @Test
+ public void testMessageFromTableRow() throws Exception {
+ TableRow tableRow =
+ new TableRow().set("nestedValue1", BASE_TABLE_ROW).set("nestedValue2", BASE_TABLE_ROW);
+ Descriptor descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA);
+ DynamicMessage msg = TableRowToStorageApiProto.messageFromTableRow(descriptor, tableRow);
+ assertEquals(2, msg.getAllFields().size());
+
+ Map<String, FieldDescriptor> fieldDescriptors =
+ descriptor.getFields().stream()
+ .collect(Collectors.toMap(FieldDescriptor::getName, Functions.identity()));
+ assertBaseRecord((DynamicMessage) msg.getField(fieldDescriptors.get("nestedvalue1")));
+ assertBaseRecord((DynamicMessage) msg.getField(fieldDescriptors.get("nestedvalue2")));
+ }
+}