You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/11/29 04:01:04 UTC
[beam] branch master updated: Merge pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5bb13fa35b9 Merge pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395
5bb13fa35b9 is described below
commit 5bb13fa35b9bc36764895c57f23d3890f0f1b567
Author: Reuven Lax <re...@google.com>
AuthorDate: Mon Nov 28 20:00:52 2022 -0800
Merge pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395
---
.../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 68 +++++
.../io/gcp/bigquery/BeamRowToStorageApiProto.java | 144 ++++-----
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 +-
.../beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 6 -
.../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 2 +-
.../sdk/io/gcp/bigquery/SplittingIterable.java | 31 +-
.../bigquery/StorageApiDynamicDestinations.java | 21 +-
.../StorageApiDynamicDestinationsBeamRow.java | 50 ++-
.../StorageApiDynamicDestinationsTableRow.java | 201 +++++-------
.../beam/sdk/io/gcp/bigquery/StorageApiLoads.java | 12 +-
.../io/gcp/bigquery/StorageApiWritePayload.java | 5 +-
.../bigquery/StorageApiWriteUnshardedRecords.java | 155 +++++-----
.../bigquery/StorageApiWritesShardedRecords.java | 214 +++++++------
.../io/gcp/bigquery/TableRowToStorageApiProto.java | 337 ++++++++++++++++-----
.../gcp/bigquery/BeamRowToStorageApiProtoTest.java | 28 +-
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 273 ++++-------------
.../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 26 +-
.../bigquery/TableRowToStorageApiProtoTest.java | 82 ++++-
18 files changed, 849 insertions(+), 818 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
new file mode 100644
index 00000000000..1680ef48e4d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+
+/**
+ * Container class used by {@link StorageApiWritesShardedRecords} and {@link
+ * StorageApiWritesShardedRecords} to enapsulate a destination {@link TableSchema} along with a
+ * {@link BigQueryServices.StreamAppendClient} and other objects needed to write records.
+ */
+class AppendClientInfo {
+ @Nullable BigQueryServices.StreamAppendClient streamAppendClient;
+ @Nullable TableSchema tableSchema;
+ Consumer<BigQueryServices.StreamAppendClient> closeAppendClient;
+ Descriptors.Descriptor descriptor;
+
+ public AppendClientInfo(
+ TableSchema tableSchema, Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
+ throws Exception {
+ this.tableSchema = tableSchema;
+ this.descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true);
+ this.closeAppendClient = closeAppendClient;
+ }
+
+ public AppendClientInfo clearAppendClient() {
+ if (streamAppendClient != null) {
+ closeAppendClient.accept(streamAppendClient);
+ this.streamAppendClient = null;
+ }
+ return this;
+ }
+
+ public AppendClientInfo createAppendClient(
+ BigQueryServices.DatasetService datasetService,
+ Supplier<String> getStreamName,
+ boolean useConnectionPool)
+ throws Exception {
+ if (streamAppendClient == null) {
+ this.streamAppendClient =
+ datasetService.getStreamAppendClient(getStreamName.get(), descriptor, useConnectionPool);
+ }
+ return this;
+ }
+
+ public void close() {
+ clearAppendClient();
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
index 816cbe9d6ca..4ef88e01c76 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
@@ -17,16 +17,11 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.ByteString;
-import com.google.protobuf.DescriptorProtos.DescriptorProto;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
-import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.Descriptors.Descriptor;
-import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.DynamicMessage;
import java.math.BigDecimal;
import java.time.LocalDate;
@@ -34,7 +29,6 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -52,8 +46,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.Visi
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
import org.joda.time.ReadableInstant;
@@ -71,38 +63,38 @@ public class BeamRowToStorageApiProto {
new BigDecimal("-99999999999999999999999999999.999999999");
// TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
- static final Map<TypeName, Type> PRIMITIVE_TYPES =
- ImmutableMap.<TypeName, Type>builder()
- .put(TypeName.INT16, Type.TYPE_INT32)
- .put(TypeName.BYTE, Type.TYPE_INT32)
- .put(TypeName.INT32, Type.TYPE_INT32)
- .put(TypeName.INT64, Type.TYPE_INT64)
- .put(TypeName.FLOAT, Type.TYPE_FLOAT)
- .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
- .put(TypeName.STRING, Type.TYPE_STRING)
- .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
- .put(TypeName.DATETIME, Type.TYPE_INT64)
- .put(TypeName.BYTES, Type.TYPE_BYTES)
- .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+ static final Map<TypeName, TableFieldSchema.Type> PRIMITIVE_TYPES =
+ ImmutableMap.<TypeName, TableFieldSchema.Type>builder()
+ .put(TypeName.INT16, TableFieldSchema.Type.INT64)
+ .put(TypeName.BYTE, TableFieldSchema.Type.INT64)
+ .put(TypeName.INT32, TableFieldSchema.Type.INT64)
+ .put(TypeName.INT64, TableFieldSchema.Type.INT64)
+ .put(TypeName.FLOAT, TableFieldSchema.Type.DOUBLE)
+ .put(TypeName.DOUBLE, TableFieldSchema.Type.DOUBLE)
+ .put(TypeName.STRING, TableFieldSchema.Type.STRING)
+ .put(TypeName.BOOLEAN, TableFieldSchema.Type.BOOL)
+ .put(TypeName.DATETIME, TableFieldSchema.Type.DATETIME)
+ .put(TypeName.BYTES, TableFieldSchema.Type.BYTES)
+ .put(TypeName.DECIMAL, TableFieldSchema.Type.BIGNUMERIC)
.build();
// A map of supported logical types to the protobuf field type.
- static final Map<String, Type> LOGICAL_TYPES =
- ImmutableMap.<String, Type>builder()
- .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
- .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
- .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
- .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
- .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+ static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
+ ImmutableMap.<String, TableFieldSchema.Type>builder()
+ .put(SqlTypes.DATE.getIdentifier(), TableFieldSchema.Type.DATE)
+ .put(SqlTypes.TIME.getIdentifier(), TableFieldSchema.Type.TIME)
+ .put(SqlTypes.DATETIME.getIdentifier(), TableFieldSchema.Type.DATETIME)
+ .put(SqlTypes.TIMESTAMP.getIdentifier(), TableFieldSchema.Type.TIMESTAMP)
+ .put(EnumerationType.IDENTIFIER, TableFieldSchema.Type.STRING)
.build();
static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
ImmutableMap.<TypeName, Function<Object, Object>>builder()
- .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
- .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
- .put(TypeName.INT32, Functions.identity())
+ .put(TypeName.INT16, o -> ((Short) o).longValue())
+ .put(TypeName.BYTE, o -> ((Byte) o).longValue())
+ .put(TypeName.INT32, o -> ((Integer) o).longValue())
.put(TypeName.INT64, Functions.identity())
- .put(TypeName.FLOAT, Function.identity())
+ .put(TypeName.FLOAT, o -> Double.valueOf(o.toString()))
.put(TypeName.DOUBLE, Function.identity())
.put(TypeName.STRING, Function.identity())
.put(TypeName.BOOLEAN, Function.identity())
@@ -134,21 +126,6 @@ public class BeamRowToStorageApiProto {
((EnumerationType) logicalType).toString((EnumerationType.Value) value))
.build();
- /**
- * Given a Beam Schema, returns a protocol-buffer Descriptor that can be used to write data using
- * the BigQuery Storage API.
- */
- public static Descriptor getDescriptorFromSchema(Schema schema)
- throws DescriptorValidationException {
- DescriptorProto descriptorProto = descriptorSchemaFromBeamSchema(schema);
- FileDescriptorProto fileDescriptorProto =
- FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
- FileDescriptor fileDescriptor =
- FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]);
-
- return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
- }
-
/**
* Given a Beam {@link Row} object, returns a protocol-buffer message that can be used to write
* data using the BigQuery Storage streaming API.
@@ -159,7 +136,9 @@ public class BeamRowToStorageApiProto {
for (int i = 0; i < row.getFieldCount(); ++i) {
Field beamField = beamSchema.getField(i);
FieldDescriptor fieldDescriptor =
- Preconditions.checkNotNull(descriptor.findFieldByName(beamField.getName().toLowerCase()));
+ Preconditions.checkNotNull(
+ descriptor.findFieldByName(beamField.getName().toLowerCase()),
+ beamField.getName().toLowerCase());
@Nullable Object value = messageValueFromRowValue(fieldDescriptor, beamField, i, row);
if (value != null) {
builder.setField(fieldDescriptor, value);
@@ -169,27 +148,19 @@ public class BeamRowToStorageApiProto {
}
@VisibleForTesting
- static DescriptorProto descriptorSchemaFromBeamSchema(Schema schema) {
+ static TableSchema protoTableSchemaFromBeamSchema(Schema schema) {
Preconditions.checkState(schema.getFieldCount() > 0);
- DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
- // Create a unique name for the descriptor ('-' characters cannot be used).
- descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
- int i = 1;
- List<DescriptorProto> nestedTypes = Lists.newArrayList();
+
+ TableSchema.Builder builder = TableSchema.newBuilder();
for (Field field : schema.getFields()) {
- FieldDescriptorProto.Builder fieldDescriptorProtoBuilder =
- fieldDescriptorFromBeamField(field, i++, nestedTypes);
- descriptorBuilder.addField(fieldDescriptorProtoBuilder);
+ builder.addFields(fieldDescriptorFromBeamField(field));
}
- nestedTypes.forEach(descriptorBuilder::addNestedType);
- return descriptorBuilder.build();
+ return builder.build();
}
- private static FieldDescriptorProto.Builder fieldDescriptorFromBeamField(
- Field field, int fieldNumber, List<DescriptorProto> nestedTypes) {
- FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
- fieldDescriptorBuilder = fieldDescriptorBuilder.setName(field.getName().toLowerCase());
- fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) {
+ TableFieldSchema.Builder builder = TableFieldSchema.newBuilder();
+ builder = builder.setName(field.getName().toLowerCase());
switch (field.getType().getTypeName()) {
case ROW:
@@ -197,10 +168,10 @@ public class BeamRowToStorageApiProto {
if (rowSchema == null) {
throw new RuntimeException("Unexpected null schema!");
}
- DescriptorProto nested = descriptorSchemaFromBeamSchema(rowSchema);
- nestedTypes.add(nested);
- fieldDescriptorBuilder =
- fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+ builder = builder.setType(TableFieldSchema.Type.STRUCT);
+ for (Schema.Field nestedField : rowSchema.getFields()) {
+ builder = builder.addFields(fieldDescriptorFromBeamField(nestedField));
+ }
break;
case ARRAY:
case ITERABLE:
@@ -211,35 +182,44 @@ public class BeamRowToStorageApiProto {
Preconditions.checkState(
!Preconditions.checkNotNull(elementType.getTypeName()).isCollectionType(),
"Nested arrays not supported by BigQuery.");
- return fieldDescriptorFromBeamField(
- Field.of(field.getName(), elementType), fieldNumber, nestedTypes)
- .setLabel(Label.LABEL_REPEATED);
+ TableFieldSchema elementFieldSchema =
+ fieldDescriptorFromBeamField(Field.of(field.getName(), elementType));
+ builder = builder.setType(elementFieldSchema.getType());
+ builder.addAllFields(elementFieldSchema.getFieldsList());
+ builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
+ break;
case LOGICAL_TYPE:
@Nullable LogicalType<?, ?> logicalType = field.getType().getLogicalType();
if (logicalType == null) {
throw new RuntimeException("Unexpected null logical type " + field.getType());
}
- @Nullable Type type = LOGICAL_TYPES.get(logicalType.getIdentifier());
+ @Nullable TableFieldSchema.Type type = LOGICAL_TYPES.get(logicalType.getIdentifier());
if (type == null) {
throw new RuntimeException("Unsupported logical type " + field.getType());
}
- fieldDescriptorBuilder = fieldDescriptorBuilder.setType(type);
+ builder = builder.setType(type);
break;
case MAP:
throw new RuntimeException("Map types not supported by BigQuery.");
default:
- @Nullable Type primitiveType = PRIMITIVE_TYPES.get(field.getType().getTypeName());
+ @Nullable
+ TableFieldSchema.Type primitiveType = PRIMITIVE_TYPES.get(field.getType().getTypeName());
if (primitiveType == null) {
throw new RuntimeException("Unsupported type " + field.getType());
}
- fieldDescriptorBuilder = fieldDescriptorBuilder.setType(primitiveType);
+ builder = builder.setType(primitiveType);
}
- if (field.getType().getNullable()) {
- fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
- } else {
- fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+ if (builder.getMode() != TableFieldSchema.Mode.REPEATED) {
+ if (field.getType().getNullable()) {
+ builder = builder.setMode(TableFieldSchema.Mode.NULLABLE);
+ } else {
+ builder = builder.setMode(TableFieldSchema.Mode.REQUIRED);
+ }
+ }
+ if (field.getDescription() != null) {
+ builder = builder.setDescription(field.getDescription());
}
- return fieldDescriptorBuilder;
+ return builder.build();
}
@Nullable
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e72bc20f780..fedc898fe97 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2668,11 +2668,9 @@ public class BigQueryIO {
}
/**
- * If true, enables automatically detecting BigQuery table schema updates. If a message with
- * unknown fields is processed, the BigQuery table is tabled to see if the schema has been
- * updated. This is intended for scenarios in which unknown fields are rare, otherwise calls to
- * BigQuery will throttle the pipeline. only supported when using one of the STORAGE_API insert
- * methods.
+ * If true, enables automatically detecting BigQuery table schema updates. Table schema updates
+ * are usually noticed within several minutes. Only supported when using one of the STORAGE_API
+ * insert methods.
*/
public Write<T> withAutoSchemaUpdate(boolean autoSchemaUpdate) {
return toBuilder().setAutoSchemaUpdate(autoSchemaUpdate).build();
@@ -3174,9 +3172,7 @@ public class BigQueryIO {
dynamicDestinations,
tableRowWriterFactory.getToRowFn(),
getCreateDisposition(),
- getIgnoreUnknownValues(),
- bqOptions.getSchemaUpdateRetries(),
- getAutoSchemaUpdate());
+ getIgnoreUnknownValues());
}
StorageApiLoads<DestinationT, T> storageApiLoads =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 53cb2713641..f0b3e061597 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -133,12 +133,6 @@ public interface BigQueryOptions
void setBigQueryProject(String value);
- @Description("Specify the number of schema update retries. For internal testing only.")
- @Default.Integer(2)
- Integer getSchemaUpdateRetries();
-
- void setSchemaUpdateRetries(Integer value);
-
@Description("Maximum (best effort) size of a single append to the storage API.")
@Default.Integer(2 * 1024 * 1024)
Integer getStorageApiAppendThresholdBytes();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 6a340496122..fbea947d056 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -576,7 +576,7 @@ public class BigQueryUtils {
case DOUBLE:
// The above types have native representations in JSON for all their
// possible values.
- return fieldValue;
+ return fieldValue.toString();
case STRING:
case INT64:
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
index 1166647f623..03b009797b5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
@@ -19,12 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.protobuf.ByteString;
-import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Iterator;
import java.util.NoSuchElementException;
-import java.util.function.Function;
-import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.DescriptorWrapper;
/**
* Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize
@@ -34,18 +30,10 @@ import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.Descrip
class SplittingIterable implements Iterable<ProtoRows> {
private final Iterable<StorageApiWritePayload> underlying;
private final long splitSize;
- private final Function<Long, DescriptorWrapper> updateSchema;
- private DescriptorWrapper currentDescriptor;
- public SplittingIterable(
- Iterable<StorageApiWritePayload> underlying,
- long splitSize,
- DescriptorWrapper currentDescriptor,
- Function<Long, DescriptorWrapper> updateSchema) {
+ public SplittingIterable(Iterable<StorageApiWritePayload> underlying, long splitSize) {
this.underlying = underlying;
this.splitSize = splitSize;
- this.updateSchema = updateSchema;
- this.currentDescriptor = currentDescriptor;
}
@Override
@@ -68,23 +56,8 @@ class SplittingIterable implements Iterable<ProtoRows> {
long bytesSize = 0;
while (underlyingIterator.hasNext()) {
StorageApiWritePayload payload = underlyingIterator.next();
- if (payload.getSchemaHash() != currentDescriptor.hash) {
- // Schema doesn't match. Try and get an updated schema hash (from the base table).
- currentDescriptor = updateSchema.apply(payload.getSchemaHash());
- // Validate that the record can now be parsed.
- try {
- DynamicMessage msg =
- DynamicMessage.parseFrom(currentDescriptor.descriptor, payload.getPayload());
- if (msg.getUnknownFields() != null && !msg.getUnknownFields().asMap().isEmpty()) {
- throw new RuntimeException(
- "Record schema does not match table. Unknown fields: "
- + msg.getUnknownFields());
- }
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
- }
ByteString byteString = ByteString.copyFrom(payload.getPayload());
+
inserts.addSerializedRows(byteString);
bytesSize += byteString.size();
if (bytesSize > splitSize) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
index 53d640a00f2..c3076e8af86 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
-import com.google.protobuf.Descriptors.Descriptor;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -31,26 +30,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
/** Base dynamicDestinations class used by the Storage API sink. */
abstract class StorageApiDynamicDestinations<T, DestinationT>
extends DynamicDestinations<T, DestinationT> {
- /** Container object that contains a proto descriptor along with its deterministic hash. */
- public static class DescriptorWrapper {
- public final Descriptor descriptor;
- public final long hash;
-
- public DescriptorWrapper(Descriptor descriptor, long hash) {
- this.descriptor = descriptor;
- this.hash = hash;
- }
-
- @Override
- public String toString() {
- return "Descriptor: " + descriptor.getFullName() + " hash: " + hash;
- }
- }
-
public interface MessageConverter<T> {
- DescriptorWrapper getSchemaDescriptor();
-
- void refreshSchema(long expectedHash) throws Exception;
+ com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema();
StorageApiWritePayload toMessage(T element) throws Exception;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
index 5f85cc1eb1b..4280d356bd2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Message;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -29,7 +30,7 @@ import org.checkerframework.checker.nullness.qual.NonNull;
/** Storage API DynamicDestinations used when the input is a Beam Row. */
class StorageApiDynamicDestinationsBeamRow<T, DestinationT extends @NonNull Object>
extends StorageApiDynamicDestinations<T, DestinationT> {
- private final Schema schema;
+ private final TableSchema tableSchema;
private final SerializableFunction<T, Row> toRow;
StorageApiDynamicDestinationsBeamRow(
@@ -37,40 +38,37 @@ class StorageApiDynamicDestinationsBeamRow<T, DestinationT extends @NonNull Obje
Schema schema,
SerializableFunction<T, Row> toRow) {
super(inner);
- this.schema = schema;
+ this.tableSchema = BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(schema);
this.toRow = toRow;
}
@Override
public MessageConverter<T> getMessageConverter(
DestinationT destination, DatasetService datasetService) throws Exception {
- return new MessageConverter<T>() {
- final Descriptor descriptor;
- final long descriptorHash;
+ return new BeamRowConverter();
+ }
- {
- descriptor = BeamRowToStorageApiProto.getDescriptorFromSchema(schema);
- descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
- }
+ class BeamRowConverter implements MessageConverter<T> {
+ final Descriptor descriptor;
- @Override
- public DescriptorWrapper getSchemaDescriptor() {
- return new DescriptorWrapper(descriptor, descriptorHash);
- }
+ BeamRowConverter() throws Exception {
+ this.descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true);
+ }
- @Override
- public void refreshSchema(long expectedHash) {}
+ @Override
+ public TableSchema getTableSchema() {
+ return tableSchema;
+ }
- @Override
- public StorageApiWritePayload toMessage(T element) {
- Message msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, toRow.apply(element));
- return new AutoValue_StorageApiWritePayload(msg.toByteArray(), descriptorHash);
- }
+ @Override
+ public StorageApiWritePayload toMessage(T element) {
+ Message msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, toRow.apply(element));
+ return new AutoValue_StorageApiWritePayload(msg.toByteArray());
+ }
- @Override
- public TableRow toTableRow(T element) {
- return BigQueryUtils.toTableRow(toRow.apply(element));
- }
- };
- }
+ @Override
+ public TableRow toTableRow(T element) {
+ return BigQueryUtils.toTableRow(toRow.apply(element));
+ }
+ };
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index b025d01f02b..6797bd20e68 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -25,26 +25,20 @@ import com.google.protobuf.Message;
import java.util.concurrent.ExecutionException;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaTooNarrowException;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonNull Object>
extends StorageApiDynamicDestinations<T, DestinationT> {
private final SerializableFunction<T, TableRow> formatFunction;
private final CreateDisposition createDisposition;
private final boolean ignoreUnknownValues;
- private final int schemaUpdateRetries;
- private final boolean autoSchemaUpdates;
private static final TableSchemaCache SCHEMA_CACHE =
new TableSchemaCache(Duration.standardSeconds(1));
- private static final Logger LOG =
- LoggerFactory.getLogger(StorageApiDynamicDestinationsTableRow.class);
static {
SCHEMA_CACHE.start();
@@ -54,15 +48,11 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
DynamicDestinations<T, DestinationT> inner,
SerializableFunction<T, TableRow> formatFunction,
CreateDisposition createDisposition,
- boolean ignoreUnknownValues,
- int schemaUpdateRetries,
- boolean autoSchemaUpdates) {
+ boolean ignoreUnknownValues) {
super(inner);
this.formatFunction = formatFunction;
this.createDisposition = createDisposition;
this.ignoreUnknownValues = ignoreUnknownValues;
- this.schemaUpdateRetries = schemaUpdateRetries;
- this.autoSchemaUpdates = autoSchemaUpdates;
}
static void clearSchemaCache() throws ExecutionException, InterruptedException {
@@ -72,128 +62,85 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
@Override
public MessageConverter<T> getMessageConverter(
DestinationT destination, DatasetService datasetService) throws Exception {
- return new MessageConverter<T>() {
- @Nullable TableSchema tableSchema;
- TableRowToStorageApiProto.SchemaInformation schemaInformation;
- Descriptor descriptor;
- long descriptorHash;
+ return new TableRowConverter(destination, datasetService);
+ }
- {
- tableSchema = getSchema(destination);
- TableReference tableReference = getTable(destination).getTableReference();
- if (tableSchema == null) {
- // If the table already exists, then try and fetch the schema from the existing
- // table.
- tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
- if (tableSchema == null) {
- if (createDisposition == CreateDisposition.CREATE_NEVER) {
- throw new RuntimeException(
- "BigQuery table "
- + tableReference
- + " not found. If you wanted to "
- + "automatically create the table, set the create disposition to CREATE_IF_NEEDED and specify a "
- + "schema.");
- } else {
- throw new RuntimeException(
- "Schema must be set for table "
- + tableReference
- + " when writing TableRows using Storage API and "
- + "using a create disposition of CREATE_IF_NEEDED.");
- }
- }
- } else {
- // Make sure we register this schema with the cache, unless there's already a more
- // up-to-date schema.
- tableSchema =
- MoreObjects.firstNonNull(
- SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), tableSchema);
- }
- schemaInformation =
- TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
- descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
- descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
- }
+ class TableRowConverter implements MessageConverter<T> {
+ final @Nullable TableSchema tableSchema;
+ final com.google.cloud.bigquery.storage.v1.TableSchema protoTableSchema;
+ final TableRowToStorageApiProto.SchemaInformation schemaInformation;
+ final Descriptor descriptor;
- @Override
- public DescriptorWrapper getSchemaDescriptor() {
- synchronized (this) {
- return new DescriptorWrapper(descriptor, descriptorHash);
- }
- }
+ TableRowConverter(
+ TableSchema tableSchema,
+ TableRowToStorageApiProto.SchemaInformation schemaInformation,
+ Descriptor descriptor) {
+ this.tableSchema = tableSchema;
+ this.protoTableSchema = TableRowToStorageApiProto.schemaToProtoTableSchema(tableSchema);
+ this.schemaInformation = schemaInformation;
+ this.descriptor = descriptor;
+ }
- @Override
- public void refreshSchema(long expectedHash) throws Exception {
- // When a table is updated, all streams writing to that table will try to refresh the
- // schema. Since we don't want them all querying the table for the schema, keep track of
- // the expected hash and return if it already matches.
- synchronized (this) {
- if (expectedHash == descriptorHash) {
- return;
+ TableRowConverter(DestinationT destination, DatasetService datasetService) throws Exception {
+ TableSchema localTableSchema = getSchema(destination);
+ TableReference tableReference = getTable(destination).getTableReference();
+ if (localTableSchema == null) {
+ // If the table already exists, then try and fetch the schema from the existing
+ // table.
+ localTableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
+ if (localTableSchema == null) {
+ if (createDisposition == CreateDisposition.CREATE_NEVER) {
+ throw new RuntimeException(
+ "BigQuery table "
+ + tableReference
+ + " not found. If you wanted to "
+ + "automatically create the table, set the create disposition to CREATE_IF_NEEDED and specify a "
+ + "schema.");
+ } else {
+ throw new RuntimeException(
+ "Schema must be set for table "
+ + tableReference
+ + " when writing TableRows using Storage API and "
+ + "using a create disposition of CREATE_IF_NEEDED.");
}
}
- refreshSchemaInternal();
+ } else {
+ // Make sure we register this schema with the cache, unless there's already a more
+ // up-to-date schema.
+ localTableSchema =
+ MoreObjects.firstNonNull(
+ SCHEMA_CACHE.putSchemaIfAbsent(tableReference, localTableSchema), localTableSchema);
}
+ this.tableSchema = localTableSchema;
+ this.protoTableSchema = TableRowToStorageApiProto.schemaToProtoTableSchema(tableSchema);
+ schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(protoTableSchema);
+ descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(
+ Preconditions.checkStateNotNull(tableSchema), true);
+ }
- public void refreshSchemaInternal() throws Exception {
- TableReference tableReference = getTable(destination).getTableReference();
- SCHEMA_CACHE.refreshSchema(tableReference, datasetService);
- TableSchema newSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
- if (newSchema == null) {
- throw new RuntimeException("BigQuery table " + tableReference + " not found");
- }
- synchronized (this) {
- tableSchema = newSchema;
- schemaInformation =
- TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
- descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
- long newHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
- if (descriptorHash != newHash) {
- LOG.info(
- "Refreshed table "
- + BigQueryHelpers.toTableSpec(tableReference)
- + " has a new schema.");
- }
- descriptorHash = newHash;
- }
- }
+ @Override
+ public com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema() {
+ return protoTableSchema;
+ }
- @Override
- public TableRow toTableRow(T element) {
- return formatFunction.apply(element);
- }
+ @Override
+ public TableRow toTableRow(T element) {
+ return formatFunction.apply(element);
+ }
- @Override
- public StorageApiWritePayload toMessage(T element) throws Exception {
- int attempt = 0;
- do {
- TableRowToStorageApiProto.SchemaInformation localSchemaInformation;
- Descriptor localDescriptor;
- long localDescriptorHash;
- synchronized (this) {
- localSchemaInformation = schemaInformation;
- localDescriptor = descriptor;
- localDescriptorHash = descriptorHash;
- }
- try {
- Message msg =
- TableRowToStorageApiProto.messageFromTableRow(
- localSchemaInformation,
- localDescriptor,
- formatFunction.apply(element),
- ignoreUnknownValues);
- return new AutoValue_StorageApiWritePayload(msg.toByteArray(), localDescriptorHash);
- } catch (SchemaTooNarrowException e) {
- if (!autoSchemaUpdates || attempt > schemaUpdateRetries) {
- throw e;
- }
- // The input record has fields not found in the schema, and ignoreUnknownValues=false.
- // It's possible that the user has updated the target table with a wider schema. Try
- // to read the target's table schema to see if that is the case.
- refreshSchemaInternal();
- ++attempt;
- }
- } while (true);
- }
- };
- }
+ @Override
+ public StorageApiWritePayload toMessage(T element) throws Exception {
+ return toMessage(formatFunction.apply(element), true);
+ }
+
+ public StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired)
+ throws Exception {
+ Message msg =
+ TableRowToStorageApiProto.messageFromTableRow(
+ schemaInformation, descriptor, tableRow, ignoreUnknownValues);
+ return StorageApiWritePayload.of(msg.toByteArray());
+ }
+ };
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
index 20ab251c9c0..da2f695f708 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
@@ -41,7 +41,6 @@ import org.joda.time.Duration;
/** This {@link PTransform} manages loads into BigQuery using the Storage API. */
public class StorageApiLoads<DestinationT, ElementT>
extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
- static final int MAX_BATCH_SIZE_BYTES = 2 * 1024 * 1024;
final TupleTag<KV<DestinationT, StorageApiWritePayload>> successfulRowsTag =
new TupleTag<>("successfulRows");
final TupleTag<BigQueryStorageApiInsertError> failedRowsTag = new TupleTag<>("failedRows");
@@ -162,6 +161,12 @@ public class StorageApiLoads<DestinationT, ElementT>
PCollection<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>> groupedRecords;
+ int maxAppendBytes =
+ input
+ .getPipeline()
+ .getOptions()
+ .as(BigQueryOptions.class)
+ .getStorageApiAppendThresholdBytes();
if (this.allowAutosharding) {
groupedRecords =
convertMessagesResult
@@ -169,7 +174,7 @@ public class StorageApiLoads<DestinationT, ElementT>
.apply(
"GroupIntoBatches",
GroupIntoBatches.<DestinationT, StorageApiWritePayload>ofByteSize(
- MAX_BATCH_SIZE_BYTES,
+ maxAppendBytes,
(StorageApiWritePayload e) -> (long) e.getPayload().length)
.withMaxBufferingDuration(triggeringFrequency)
.withShardedKey());
@@ -182,8 +187,7 @@ public class StorageApiLoads<DestinationT, ElementT>
shardedRecords.apply(
"GroupIntoBatches",
GroupIntoBatches.<ShardedKey<DestinationT>, StorageApiWritePayload>ofByteSize(
- MAX_BATCH_SIZE_BYTES,
- (StorageApiWritePayload e) -> (long) e.getPayload().length)
+ maxAppendBytes, (StorageApiWritePayload e) -> (long) e.getPayload().length)
.withMaxBufferingDuration(triggeringFrequency));
}
PCollectionTuple writeRecordsResult =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java
index 4b620a5c6d4..00a34b9c14f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.auto.value.AutoValue;
+import java.io.IOException;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@@ -28,5 +29,7 @@ public abstract class StorageApiWritePayload {
@SuppressWarnings("mutable")
public abstract byte[] getPayload();
- public abstract long getSchemaHash();
+ static StorageApiWritePayload of(byte[] payload) throws IOException {
+ return new AutoValue_StorageApiWritePayload(payload);
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 0f86b8871f0..99317a3fb23 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -25,6 +25,7 @@ import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
@@ -46,7 +47,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StreamAppendClient;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
-import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.DescriptorWrapper;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
@@ -99,18 +99,17 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
* StreamAppendClient after looking up the cache, and we must ensure that the cache is not
* accessed in between the lookup and the pin (any access of the cache could trigger element
* expiration). Therefore most used of APPEND_CLIENTS should synchronize.
- *
- * <p>TODO(reuvenlax); Once all uses of StreamWriter are using
*/
- private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
+ private static final Cache<String, AppendClientInfo> APPEND_CLIENTS =
CacheBuilder.newBuilder()
.expireAfterAccess(15, TimeUnit.MINUTES)
.removalListener(
- (RemovalNotification<String, StreamAppendClient> removal) -> {
+ (RemovalNotification<String, AppendClientInfo> removal) -> {
LOG.info("Expiring append client for " + removal.getKey());
- final @Nullable StreamAppendClient streamAppendClient = removal.getValue();
- // Close the writer in a different thread so as not to block the main one.
- runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close);
+ final @Nullable AppendClientInfo appendClientInfo = removal.getValue();
+ if (appendClientInfo != null) {
+ appendClientInfo.close();
+ }
})
.build();
@@ -199,9 +198,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
class DestinationState {
private final String tableUrn;
- private final MessageConverter<ElementT> messageConverter;
private String streamName = "";
- private @Nullable StreamAppendClient streamAppendClient = null;
+ private @Nullable AppendClientInfo appendClientInfo = null;
private long currentOffset = 0;
private List<ByteString> pendingMessages;
private transient @Nullable DatasetService maybeDatasetService;
@@ -209,8 +207,6 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
private final Counter appendFailures =
Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
- private final Counter schemaMismatches =
- Metrics.counter(WriteRecordsDoFn.class, "schemaMismatches");
private final Distribution inflightWaitSecondsDistribution =
Metrics.distribution(WriteRecordsDoFn.class, "streamWriterWaitSeconds");
private final Counter rowsSentToFailedRowsCollection =
@@ -219,7 +215,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
"rowsSentToFailedRowsCollection");
private final boolean useDefaultStream;
- private DescriptorWrapper descriptorWrapper;
+ private TableSchema tableSchema;
private Instant nextCacheTickle = Instant.MAX;
private final int clientNumber;
private final boolean usingMultiplexing;
@@ -232,13 +228,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
boolean useDefaultStream,
int streamAppendClientCount,
boolean usingMultiplexing,
- long maxRequestSize) {
+ long maxRequestSize)
+ throws Exception {
this.tableUrn = tableUrn;
- this.messageConverter = messageConverter;
this.pendingMessages = Lists.newArrayList();
this.maybeDatasetService = datasetService;
this.useDefaultStream = useDefaultStream;
- this.descriptorWrapper = messageConverter.getSchemaDescriptor();
+ this.tableSchema = messageConverter.getTableSchema();
this.clientNumber = new Random().nextInt(streamAppendClientCount);
this.usingMultiplexing = usingMultiplexing;
this.maxRequestSize = maxRequestSize;
@@ -246,9 +242,11 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
void teardown() {
maybeTickleCache();
- if (streamAppendClient != null) {
- runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::unpin);
- streamAppendClient = null;
+ if (appendClientInfo != null) {
+ if (appendClientInfo.streamAppendClient != null) {
+ runAsyncIgnoreFailure(closeWriterExecutor, appendClientInfo.streamAppendClient::unpin);
+ }
+ appendClientInfo = null;
}
}
@@ -279,41 +277,52 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
return this.streamName;
}
- StreamAppendClient generateClient() throws Exception {
+ AppendClientInfo generateClient(boolean createAppendClient) throws Exception {
Preconditions.checkStateNotNull(maybeDatasetService);
- return maybeDatasetService.getStreamAppendClient(
- streamName, descriptorWrapper.descriptor, usingMultiplexing);
+ AppendClientInfo appendClientInfo =
+ new AppendClientInfo(
+ tableSchema,
+ // Make sure that the client is always closed in a different thread to avoid
+ // blocking.
+ client -> runAsyncIgnoreFailure(closeWriterExecutor, client::close));
+ if (createAppendClient) {
+ appendClientInfo =
+ appendClientInfo.createAppendClient(
+ maybeDatasetService, () -> streamName, usingMultiplexing);
+ Preconditions.checkStateNotNull(appendClientInfo.streamAppendClient).pin();
+ }
+ return appendClientInfo;
}
- StreamAppendClient getStreamAppendClient(boolean lookupCache) {
+ AppendClientInfo getAppendClientInfo(boolean lookupCache, boolean createAppendClient) {
try {
- if (this.streamAppendClient == null) {
+ if (this.appendClientInfo == null) {
getOrCreateStreamName();
- final StreamAppendClient newStreamAppendClient;
+ final AppendClientInfo newAppendClientInfo;
synchronized (APPEND_CLIENTS) {
if (lookupCache) {
- newStreamAppendClient =
+ newAppendClientInfo =
APPEND_CLIENTS.get(
- getStreamAppendClientCacheEntryKey(), () -> generateClient());
+ getStreamAppendClientCacheEntryKey(),
+ () -> generateClient(createAppendClient));
} else {
- newStreamAppendClient = generateClient();
- // override the clients in the cache
- APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), newStreamAppendClient);
+ newAppendClientInfo = generateClient(createAppendClient);
+ // override the clients in the cache.
+ APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), newAppendClientInfo);
}
- newStreamAppendClient.pin();
}
this.currentOffset = 0;
nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));
- this.streamAppendClient = newStreamAppendClient;
+ this.appendClientInfo = newAppendClientInfo;
}
- return streamAppendClient;
+ return appendClientInfo;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
void maybeTickleCache() {
- if (streamAppendClient != null && Instant.now().isAfter(nextCacheTickle)) {
+ if (appendClientInfo != null && Instant.now().isAfter(nextCacheTickle)) {
synchronized (APPEND_CLIENTS) {
APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryKey());
}
@@ -322,57 +331,36 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
}
void invalidateWriteStream() {
- if (streamAppendClient != null) {
+ if (appendClientInfo != null) {
synchronized (APPEND_CLIENTS) {
// Unpin in a different thread, as it may execute a blocking close.
- runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::unpin);
+ if (appendClientInfo.streamAppendClient != null) {
+ runAsyncIgnoreFailure(
+ closeWriterExecutor, appendClientInfo.streamAppendClient::unpin);
+ }
// The default stream is cached across multiple different DoFns. If they all try and
- // invalidate, then we can
- // get races between threads invalidating and recreating streams. For this reason, we
- // check to see that the
- // cache still contains the object we created before invalidating (in case another
- // thread has already invalidated
- // and recreated the stream).
+ // invalidate, then we can get races between threads invalidating and recreating
+ // streams. For this reason,
+ // we check to see that the cache still contains the object we created before
+ // invalidating (in case another
+ // thread has already invalidated and recreated the stream).
String cacheEntryKey = getStreamAppendClientCacheEntryKey();
@Nullable
- StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(cacheEntryKey);
+ AppendClientInfo cachedAppendClient = APPEND_CLIENTS.getIfPresent(cacheEntryKey);
if (cachedAppendClient != null
&& System.identityHashCode(cachedAppendClient)
- == System.identityHashCode(streamAppendClient)) {
+ == System.identityHashCode(appendClientInfo)) {
APPEND_CLIENTS.invalidate(cacheEntryKey);
}
}
- streamAppendClient = null;
+ appendClientInfo = null;
}
}
void addMessage(StorageApiWritePayload payload) throws Exception {
maybeTickleCache();
- if (payload.getSchemaHash() != descriptorWrapper.hash) {
- schemaMismatches.inc();
- // The descriptor on the payload doesn't match the descriptor we know about. This
- // means that the table has been updated, but that this transform hasn't found out
- // about that yet. Refresh the schema and force a new StreamAppendClient to be
- // created.
- messageConverter.refreshSchema(payload.getSchemaHash());
- descriptorWrapper = messageConverter.getSchemaDescriptor();
- invalidateWriteStream();
- if (useDefaultStream) {
- // Since the default stream client is shared across many bundles and threads, we can't
- // simply look it up from the cache, as another thread may have recreated it with the
- // old
- // schema.
- getStreamAppendClient(false);
- }
- // Validate that the record can now be parsed.
- DynamicMessage msg =
- DynamicMessage.parseFrom(descriptorWrapper.descriptor, payload.getPayload());
- if (msg.getUnknownFields() != null && !msg.getUnknownFields().asMap().isEmpty()) {
- throw new RuntimeException(
- "Record schema does not match table. Unknown fields: " + msg.getUnknownFields());
- }
- }
- pendingMessages.add(ByteString.copyFrom(payload.getPayload()));
+ ByteString payloadBytes = ByteString.copyFrom(payload.getPayload());
+ pendingMessages.add(payloadBytes);
}
long flush(
@@ -403,7 +391,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
for (ByteString rowBytes : inserts.getSerializedRowsList()) {
TableRow failedRow =
TableRowToStorageApiProto.tableRowFromMessage(
- DynamicMessage.parseFrom(descriptorWrapper.descriptor, rowBytes));
+ DynamicMessage.parseFrom(
+ getAppendClientInfo(true, false).descriptor, rowBytes));
failedRowsReceiver.output(
new BigQueryStorageApiInsertError(
failedRow, "Row payload too large. Maximum size " + maxRequestSize));
@@ -426,7 +415,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
}
try {
- StreamAppendClient writeStream = getStreamAppendClient(true);
+ StreamAppendClient writeStream =
+ Preconditions.checkStateNotNull(
+ getAppendClientInfo(true, true).streamAppendClient);
ApiFuture<AppendRowsResponse> response =
writeStream.appendRows(c.offset, c.protoRows);
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
@@ -457,7 +448,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
try {
TableRow failedRow =
TableRowToStorageApiProto.tableRowFromMessage(
- DynamicMessage.parseFrom(descriptorWrapper.descriptor, protoBytes));
+ DynamicMessage.parseFrom(
+ Preconditions.checkStateNotNull(appendClientInfo).descriptor,
+ protoBytes));
new BigQueryStorageApiInsertError(
failedRow, error.getRowIndexToErrorMessage().get(failedIndex));
failedRowsReceiver.output(
@@ -622,17 +615,17 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
MessageConverter<ElementT> messageConverter;
try {
messageConverter = messageConverters.get(destination, dynamicDestinations, datasetService);
+ return new DestinationState(
+ tableDestination1.getTableUrn(),
+ messageConverter,
+ datasetService,
+ useDefaultStream,
+ streamAppendClientCount,
+ bigQueryOptions.getUseStorageApiConnectionPool(),
+ bigQueryOptions.getStorageWriteApiMaxRequestSize());
} catch (Exception e) {
throw new RuntimeException(e);
}
- return new DestinationState(
- tableDestination1.getTableUrn(),
- messageConverter,
- datasetService,
- useDefaultStream,
- streamAppendClientCount,
- bigQueryOptions.getUseStorageApiConnectionPool(),
- bigQueryOptions.getStorageWriteApiMaxRequestSize());
}
@ProcessElement
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index af0ae5169bc..ee3bf69f503 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -26,6 +26,7 @@ import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
@@ -40,10 +41,10 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -51,8 +52,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StreamAppendClient;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
-import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.DescriptorWrapper;
-import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
@@ -124,14 +123,45 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
private final TupleTag<KV<String, Operation>> flushTag = new TupleTag<>("flushTag");
private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
- private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
+ // Context passed into RetryManager for each call.
+ class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> {
+ final ShardedKey<DestinationT> key;
+ String streamName = "";
+ @Nullable StreamAppendClient client = null;
+ long offset = -1;
+ long numRows = 0;
+ long tryIteration = 0;
+ ProtoRows protoRows;
+
+ AppendRowsContext(ShardedKey<DestinationT> key, ProtoRows protoRows) {
+ this.key = key;
+ this.protoRows = protoRows;
+ }
+
+ @Override
+ public String toString() {
+ return "Context: key="
+ + key
+ + " streamName="
+ + streamName
+ + " offset="
+ + offset
+ + " numRows="
+ + numRows
+ + " tryIteration: "
+ + tryIteration;
+ }
+ };
+
+ private static final Cache<ShardedKey<?>, AppendClientInfo> APPEND_CLIENTS =
CacheBuilder.newBuilder()
.expireAfterAccess(5, TimeUnit.MINUTES)
.removalListener(
- (RemovalNotification<String, StreamAppendClient> removal) -> {
- final @Nullable StreamAppendClient streamAppendClient = removal.getValue();
- // Close the writer in a different thread so as not to block the main one.
- runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close);
+ (RemovalNotification<ShardedKey<?>, AppendClientInfo> removal) -> {
+ final @Nullable AppendClientInfo appendClientInfo = removal.getValue();
+ if (appendClientInfo != null) {
+ appendClientInfo.close();
+ }
})
.build();
@@ -175,12 +205,16 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
@Override
public PCollectionTuple expand(
PCollection<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>> input) {
+ BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+ final long splitSize = bigQueryOptions.getStorageApiAppendThresholdBytes();
+ final long maxRequestSize = bigQueryOptions.getStorageWriteApiMaxRequestSize();
+
String operationName = input.getName() + "/" + getName();
// Append records to the Storage API streams.
PCollectionTuple writeRecordsResult =
input.apply(
"Write Records",
- ParDo.of(new WriteRecordsDoFn(operationName, streamIdleTime))
+ ParDo.of(new WriteRecordsDoFn(operationName, streamIdleTime, splitSize, maxRequestSize))
.withSideInputs(dynamicDestinations.getSideInputs())
.withOutputTags(flushTag, TupleTagList.of(failedRowsTag)));
@@ -257,10 +291,15 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
private final TimerSpec idleTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
private final Duration streamIdleTime;
+ private final long splitSize;
+ private final long maxRequestSize;
- public WriteRecordsDoFn(String operationName, Duration streamIdleTime) {
+ public WriteRecordsDoFn(
+ String operationName, Duration streamIdleTime, long splitSize, long maxRequestSize) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.streamIdleTime = streamIdleTime;
+ this.splitSize = splitSize;
+ this.maxRequestSize = maxRequestSize;
}
@StartBundle
@@ -269,27 +308,29 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
}
// Get the current stream for this key. If there is no current stream, create one and store the
- // stream name in
- // persistent state.
+ // stream name in persistent state.
String getOrCreateStream(
String tableId,
ValueState<String> streamName,
ValueState<Long> streamOffset,
Timer streamIdleTimer,
- DatasetService datasetService)
- throws IOException, InterruptedException {
- String stream = streamName.read();
- if (Strings.isNullOrEmpty(stream)) {
- // In a buffered stream, data is only visible up to the offset to which it was flushed.
- stream = datasetService.createWriteStream(tableId, Type.BUFFERED).getName();
- streamName.write(stream);
- streamOffset.write(0L);
- streamsCreated.inc();
- }
- // Reset the idle timer.
- streamIdleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
+ DatasetService datasetService) {
+ try {
+ String stream = streamName.read();
+ if (Strings.isNullOrEmpty(stream)) {
+ // In a buffered stream, data is only visible up to the offset to which it was flushed.
+ stream = datasetService.createWriteStream(tableId, Type.BUFFERED).getName();
+ streamName.write(stream);
+ streamOffset.write(0L);
+ streamsCreated.inc();
+ }
+ // Reset the idle timer.
+ streamIdleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
- return stream;
+ return stream;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
@@ -340,68 +381,30 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
});
final String tableId = tableDestination.getTableUrn();
final DatasetService datasetService = getDatasetService(pipelineOptions);
- MessageConverter<ElementT> messageConverter =
- messageConverters.get(element.getKey().getKey(), dynamicDestinations, datasetService);
- AtomicReference<DescriptorWrapper> descriptor =
- new AtomicReference<>(messageConverter.getSchemaDescriptor());
-
- // Each ProtoRows object contains at most 1MB of rows.
- // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if
- // already proto or already schema.
- final long splitSize = bigQueryOptions.getStorageApiAppendThresholdBytes();
- // Called if the schema does not match.
- Function<Long, DescriptorWrapper> updateSchemaHash =
- (Long expectedHash) -> {
- try {
- LOG.info("Schema does not match. Querying BigQuery for the current table schema.");
- // Update the schema from the table.
- messageConverter.refreshSchema(expectedHash);
- descriptor.set(messageConverter.getSchemaDescriptor());
- // Force a new connection.
- String stream = streamName.read();
- if (stream != null) {
- APPEND_CLIENTS.invalidate(stream);
- }
- return descriptor.get();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
- Iterable<ProtoRows> messages =
- new SplittingIterable(element.getValue(), splitSize, descriptor.get(), updateSchemaHash);
-
- class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> {
- final ShardedKey<DestinationT> key;
- String streamName = "";
- @Nullable StreamAppendClient client = null;
- long offset = -1;
- long numRows = 0;
- long tryIteration = 0;
- ProtoRows protoRows;
-
- AppendRowsContext(ShardedKey<DestinationT> key, ProtoRows protoRows) {
- this.key = key;
- this.protoRows = protoRows;
- }
- @Override
- public String toString() {
- return "Context: key="
- + key
- + " streamName="
- + streamName
- + " offset="
- + offset
- + " numRows="
- + numRows
- + " tryIteration: "
- + tryIteration;
- }
- };
+ Supplier<String> getOrCreateStream =
+ () -> getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService);
+ final AppendClientInfo appendClientInfo =
+ APPEND_CLIENTS.get(
+ element.getKey(),
+ () -> {
+ @Nullable
+ TableSchema tableSchema =
+ messageConverters
+ .get(element.getKey().getKey(), dynamicDestinations, datasetService)
+ .getTableSchema();
+ return new AppendClientInfo(
+ tableSchema,
+ // Make sure that the client is always closed in a different thread to avoid
+ // blocking.
+ client -> runAsyncIgnoreFailure(closeWriterExecutor, client::close))
+ .createAppendClient(datasetService, getOrCreateStream, false);
+ });
+
+ Iterable<ProtoRows> messages = new SplittingIterable(element.getValue(), splitSize);
// Initialize stream names and offsets for all contexts. This will be called initially, but
- // will also be called
- // if we roll over to a new stream on a retry.
+ // will also be called if we roll over to a new stream on a retry.
BiConsumer<Iterable<AppendRowsContext>, Boolean> initializeContexts =
(contexts, isFailure) -> {
try {
@@ -409,18 +412,13 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
// Clear the stream name, forcing a new one to be created.
streamName.write("");
}
- String stream =
- getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService);
- StreamAppendClient appendClient =
- APPEND_CLIENTS.get(
- stream,
- () ->
- datasetService.getStreamAppendClient(
- stream, descriptor.get().descriptor, false));
+ appendClientInfo.createAppendClient(datasetService, getOrCreateStream, false);
+ StreamAppendClient streamAppendClient =
+ Preconditions.checkArgumentNotNull(appendClientInfo.streamAppendClient);
for (AppendRowsContext context : contexts) {
- context.streamName = stream;
- appendClient.pin();
- context.client = appendClient;
+ context.streamName = streamName.read();
+ streamAppendClient.pin();
+ context.client = appendClientInfo.streamAppendClient;
context.offset = streamOffset.read();
++context.tryIteration;
streamOffset.write(context.offset + context.protoRows.getSerializedRowsCount());
@@ -432,7 +430,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
Consumer<Iterable<AppendRowsContext>> clearClients =
contexts -> {
- APPEND_CLIENTS.invalidate(streamName.read());
+ appendClientInfo.clearAppendClient();
for (AppendRowsContext context : contexts) {
if (context.client != null) {
// Unpin in a different thread, as it may execute a blocking close.
@@ -450,13 +448,9 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
}
try {
- StreamAppendClient appendClient =
- APPEND_CLIENTS.get(
- context.streamName,
- () ->
- datasetService.getStreamAppendClient(
- context.streamName, descriptor.get().descriptor, false));
- return appendClient.appendRows(context.offset, context.protoRows);
+ appendClientInfo.createAppendClient(datasetService, getOrCreateStream, false);
+ return Preconditions.checkStateNotNull(appendClientInfo.streamAppendClient)
+ .appendRows(context.offset, context.protoRows);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -485,7 +479,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
try {
TableRow failedRow =
TableRowToStorageApiProto.tableRowFromMessage(
- DynamicMessage.parseFrom(descriptor.get().descriptor, protoBytes));
+ DynamicMessage.parseFrom(appendClientInfo.descriptor, protoBytes));
new BigQueryStorageApiInsertError(
failedRow, error.getRowIndexToErrorMessage().get(failedIndex));
o.get(failedRowsTag)
@@ -580,7 +574,6 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
false)));
flushesScheduled.inc(context.protoRows.getSerializedRowsCount());
};
- long maxRequestSize = bigQueryOptions.getStorageWriteApiMaxRequestSize();
Instant now = Instant.now();
List<AppendRowsContext> contexts = Lists.newArrayList();
RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
@@ -602,7 +595,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
for (ByteString rowBytes : protoRows.getSerializedRowsList()) {
TableRow failedRow =
TableRowToStorageApiProto.tableRowFromMessage(
- DynamicMessage.parseFrom(descriptor.get().descriptor, rowBytes));
+ DynamicMessage.parseFrom(appendClientInfo.descriptor, rowBytes));
o.get(failedRowsTag)
.output(
new BigQueryStorageApiInsertError(
@@ -643,6 +636,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
private void finalizeStream(
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
+ ShardedKey<DestinationT> key,
MultiOutputReceiver o,
org.joda.time.Instant finalizeElementTs) {
String stream = MoreObjects.firstNonNull(streamName.read(), "");
@@ -656,12 +650,13 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
streamName.clear();
streamOffset.clear();
// Make sure that the stream object is closed.
- APPEND_CLIENTS.invalidate(stream);
+ APPEND_CLIENTS.invalidate(key);
}
}
@OnTimer("idleTimer")
public void onTimer(
+ @Key ShardedKey<DestinationT> key,
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
MultiOutputReceiver o,
@@ -672,19 +667,20 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
// a pipeline) this finalize element will be dropped as late. This is usually ok as
// BigQuery will eventually garbage collect the stream. We attempt to finalize idle streams
// merely to remove the pressure of large numbers of orphaned streams from BigQuery.
- finalizeStream(streamName, streamOffset, o, window.maxTimestamp());
+ finalizeStream(streamName, streamOffset, key, o, window.maxTimestamp());
streamsIdle.inc();
}
@OnWindowExpiration
public void onWindowExpiration(
+ @Key ShardedKey<DestinationT> key,
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
MultiOutputReceiver o,
BoundedWindow window) {
// Window is done - usually because the pipeline has been drained. Make sure to clean up
// streams so that they are not leaked.
- finalizeStream(streamName, streamOffset, o, window.maxTimestamp());
+ finalizeStream(streamName, streamOffset, key, o, window.maxTimestamp());
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index eaf90fea829..cb0b0eaa5e3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -19,10 +19,10 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static java.util.stream.Collectors.toList;
-import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder;
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
@@ -56,6 +56,7 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -107,30 +108,192 @@ public class TableRowToStorageApiProto {
}
}
+ ///////////////////////////////////
+ // Conversion between TableSchema the json class and TableSchema the proto class.
+
+ private static final Map<Mode, TableFieldSchema.Mode> MODE_MAP_JSON_PROTO =
+ ImmutableMap.of(
+ Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
+ Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED,
+ Mode.REPEATED, TableFieldSchema.Mode.REPEATED);
+ private static final Map<TableFieldSchema.Mode, String> MODE_MAP_PROTO_JSON =
+ ImmutableMap.of(
+ TableFieldSchema.Mode.NULLABLE, "NULLABLE",
+ TableFieldSchema.Mode.REQUIRED, "REQUIRED",
+ TableFieldSchema.Mode.REPEATED, "REPEATED");
+
+ private static final Map<String, TableFieldSchema.Type> TYPE_MAP_JSON_PROTO =
+ ImmutableMap.<String, TableFieldSchema.Type>builder()
+ .put("STRUCT", TableFieldSchema.Type.STRUCT)
+ .put("RECORD", TableFieldSchema.Type.STRUCT)
+ .put("INT64", TableFieldSchema.Type.INT64)
+ .put("INTEGER", TableFieldSchema.Type.INT64)
+ .put("FLOAT64", TableFieldSchema.Type.DOUBLE)
+ .put("FLOAT", TableFieldSchema.Type.DOUBLE)
+ .put("STRING", TableFieldSchema.Type.STRING)
+ .put("BOOL", TableFieldSchema.Type.BOOL)
+ .put("BOOLEAN", TableFieldSchema.Type.BOOL)
+ .put("BYTES", TableFieldSchema.Type.BYTES)
+ .put("NUMERIC", TableFieldSchema.Type.NUMERIC)
+ .put("BIGNUMERIC", TableFieldSchema.Type.BIGNUMERIC)
+ .put("GEOGRAPHY", TableFieldSchema.Type.GEOGRAPHY)
+ .put("DATE", TableFieldSchema.Type.DATE)
+ .put("TIME", TableFieldSchema.Type.TIME)
+ .put("DATETIME", TableFieldSchema.Type.DATETIME)
+ .put("TIMESTAMP", TableFieldSchema.Type.TIMESTAMP)
+ .put("JSON", TableFieldSchema.Type.JSON)
+ .build();
+ private static final Map<TableFieldSchema.Type, String> TYPE_MAP_PROTO_JSON =
+ ImmutableMap.<TableFieldSchema.Type, String>builder()
+ .put(TableFieldSchema.Type.STRUCT, "STRUCT")
+ .put(TableFieldSchema.Type.INT64, "INT64")
+ .put(TableFieldSchema.Type.DOUBLE, "FLOAT64")
+ .put(TableFieldSchema.Type.STRING, "STRING")
+ .put(TableFieldSchema.Type.BOOL, "BOOL")
+ .put(TableFieldSchema.Type.BYTES, "BYTES")
+ .put(TableFieldSchema.Type.NUMERIC, "NUMERIC")
+ .put(TableFieldSchema.Type.BIGNUMERIC, "BIGNUMERIC")
+ .put(TableFieldSchema.Type.GEOGRAPHY, "GEOGRAPHY")
+ .put(TableFieldSchema.Type.DATE, "DATE")
+ .put(TableFieldSchema.Type.TIME, "TIME")
+ .put(TableFieldSchema.Type.DATETIME, "DATETIME")
+ .put(TableFieldSchema.Type.TIMESTAMP, "TIMESTAMP")
+ .put(TableFieldSchema.Type.JSON, "JSON")
+ .build();
+
+ public static TableFieldSchema.Mode modeToProtoMode(String mode) {
+ return Optional.ofNullable(mode)
+ .map(Mode::valueOf)
+ .map(m -> MODE_MAP_JSON_PROTO.get(m))
+ .orElse(TableFieldSchema.Mode.REQUIRED);
+ }
+
+ public static String protoModeToJsonMode(TableFieldSchema.Mode protoMode) {
+ String jsonMode = MODE_MAP_PROTO_JSON.get(protoMode);
+ if (jsonMode == null) {
+ throw new RuntimeException("Unknown mode " + protoMode);
+ }
+ return jsonMode;
+ }
+
+ public static String protoTypeToJsonType(TableFieldSchema.Type protoType) {
+ String type = TYPE_MAP_PROTO_JSON.get(protoType);
+ if (type == null) {
+ throw new RuntimeException("Unknown type " + protoType);
+ }
+ return type;
+ }
+
+ public static TableFieldSchema.Type typeToProtoType(String type) {
+ TableFieldSchema.Type protoType = TYPE_MAP_JSON_PROTO.get(type);
+ if (protoType == null) {
+ throw new RuntimeException("Unknown type " + type);
+ }
+ return protoType;
+ }
+
+ public static com.google.api.services.bigquery.model.TableSchema protoSchemaToTableSchema(
+ TableSchema protoTableSchema) {
+ com.google.api.services.bigquery.model.TableSchema tableSchema =
+ new com.google.api.services.bigquery.model.TableSchema();
+ List<com.google.api.services.bigquery.model.TableFieldSchema> tableFields =
+ Lists.newArrayListWithExpectedSize(protoTableSchema.getFieldsCount());
+ for (TableFieldSchema protoTableField : protoTableSchema.getFieldsList()) {
+ tableFields.add(protoTableFieldToTableField(protoTableField));
+ }
+ return tableSchema.setFields(tableFields);
+ }
+
+ public static com.google.api.services.bigquery.model.TableFieldSchema protoTableFieldToTableField(
+ TableFieldSchema protoTableField) {
+ com.google.api.services.bigquery.model.TableFieldSchema tableField =
+ new com.google.api.services.bigquery.model.TableFieldSchema();
+ tableField = tableField.setName(protoTableField.getName());
+ if (!Strings.isNullOrEmpty(tableField.getDescription())) {
+ tableField = tableField.setDescription(protoTableField.getDescription());
+ }
+ if (protoTableField.getMaxLength() != 0) {
+ tableField = tableField.setMaxLength(protoTableField.getMaxLength());
+ }
+ if (protoTableField.getMode() != TableFieldSchema.Mode.MODE_UNSPECIFIED) {
+ tableField = tableField.setMode(protoModeToJsonMode(protoTableField.getMode()));
+ }
+ if (protoTableField.getPrecision() != 0) {
+ tableField = tableField.setPrecision(protoTableField.getPrecision());
+ }
+ if (protoTableField.getScale() != 0) {
+ tableField = tableField.setScale(protoTableField.getScale());
+ }
+ tableField = tableField.setType(protoTypeToJsonType(protoTableField.getType()));
+ if (protoTableField.getType().equals(TableFieldSchema.Type.STRUCT)) {
+ List<com.google.api.services.bigquery.model.TableFieldSchema> subFields =
+ Lists.newArrayListWithExpectedSize(protoTableField.getFieldsCount());
+ for (TableFieldSchema subField : protoTableField.getFieldsList()) {
+ subFields.add(protoTableFieldToTableField(subField));
+ }
+ tableField = tableField.setFields(subFields);
+ }
+ return tableField;
+ }
+
+ public static TableSchema schemaToProtoTableSchema(
+ com.google.api.services.bigquery.model.TableSchema tableSchema) {
+ TableSchema.Builder builder = TableSchema.newBuilder();
+ if (tableSchema.getFields() != null) {
+ for (com.google.api.services.bigquery.model.TableFieldSchema field :
+ tableSchema.getFields()) {
+ builder.addFields(tableFieldToProtoTableField(field));
+ }
+ }
+ return builder.build();
+ }
+
+ public static TableFieldSchema tableFieldToProtoTableField(
+ com.google.api.services.bigquery.model.TableFieldSchema field) {
+ TableFieldSchema.Builder builder = TableFieldSchema.newBuilder();
+ builder.setName(field.getName());
+ if (field.getDescription() != null) {
+ builder.setDescription(field.getDescription());
+ }
+ if (field.getMaxLength() != null) {
+ builder.setMaxLength(field.getMaxLength());
+ }
+ if (field.getMode() != null) {
+ builder.setMode(modeToProtoMode(field.getMode()));
+ }
+ if (field.getPrecision() != null) {
+ builder.setPrecision(field.getPrecision());
+ }
+ if (field.getScale() != null) {
+ builder.setScale(field.getScale());
+ }
+ builder.setType(typeToProtoType(field.getType()));
+ if (builder.getType().equals(TableFieldSchema.Type.STRUCT)) {
+ for (com.google.api.services.bigquery.model.TableFieldSchema subField : field.getFields()) {
+ builder.addFields(tableFieldToProtoTableField(subField));
+ }
+ }
+ return builder.build();
+ }
+
static class SchemaInformation {
private final TableFieldSchema tableFieldSchema;
private final List<SchemaInformation> subFields;
private final Map<String, SchemaInformation> subFieldsByName;
private final Iterable<SchemaInformation> parentSchemas;
- private SchemaInformation(TableFieldSchema tableFieldSchema) {
- this(tableFieldSchema, Collections.emptyList());
- }
-
private SchemaInformation(
TableFieldSchema tableFieldSchema, Iterable<SchemaInformation> parentSchemas) {
this.tableFieldSchema = tableFieldSchema;
this.subFields = Lists.newArrayList();
this.subFieldsByName = Maps.newHashMap();
this.parentSchemas = parentSchemas;
- if (tableFieldSchema.getFields() != null) {
- for (TableFieldSchema field : tableFieldSchema.getFields()) {
- SchemaInformation schemaInformation =
- new SchemaInformation(
- field, Iterables.concat(this.parentSchemas, ImmutableList.of(this)));
- subFields.add(schemaInformation);
- subFieldsByName.put(field.getName(), schemaInformation);
- }
+ for (TableFieldSchema field : tableFieldSchema.getFieldsList()) {
+ SchemaInformation schemaInformation =
+ new SchemaInformation(
+ field, Iterables.concat(this.parentSchemas, ImmutableList.of(this)));
+ subFields.add(schemaInformation);
+ subFieldsByName.put(field.getName(), schemaInformation);
}
}
@@ -146,7 +309,7 @@ public class TableRowToStorageApiProto {
return tableFieldSchema.getName();
}
- public String getType() {
+ public TableFieldSchema.Type getType() {
return tableFieldSchema.getType();
}
@@ -167,42 +330,50 @@ public class TableRowToStorageApiProto {
}
static SchemaInformation fromTableSchema(TableSchema tableSchema) {
- TableFieldSchema rootSchema =
- new TableFieldSchema()
- .setName("__root__")
- .setType("RECORD")
- .setFields(tableSchema.getFields());
- return new SchemaInformation(rootSchema);
+ TableFieldSchema root =
+ TableFieldSchema.newBuilder()
+ .addAllFields(tableSchema.getFieldsList())
+ .setName("root")
+ .build();
+ return new SchemaInformation(root, Collections.emptyList());
+ }
+
+ static SchemaInformation fromTableSchema(
+ com.google.api.services.bigquery.model.TableSchema jsonTableSchema) {
+ return SchemaInformation.fromTableSchema(schemaToProtoTableSchema(jsonTableSchema));
}
}
- static final Map<String, Type> PRIMITIVE_TYPES =
- ImmutableMap.<String, Type>builder()
- .put("INT64", Type.TYPE_INT64)
- .put("INTEGER", Type.TYPE_INT64)
- .put("FLOAT64", Type.TYPE_DOUBLE)
- .put("FLOAT", Type.TYPE_DOUBLE)
- .put("STRING", Type.TYPE_STRING)
- .put("BOOL", Type.TYPE_BOOL)
- .put("BOOLEAN", Type.TYPE_BOOL)
- .put("BYTES", Type.TYPE_BYTES)
- .put("NUMERIC", Type.TYPE_BYTES)
- .put("BIGNUMERIC", Type.TYPE_BYTES)
- .put("GEOGRAPHY", Type.TYPE_STRING) // Pass through the JSON encoding.
- .put("DATE", Type.TYPE_INT32)
- .put("TIME", Type.TYPE_INT64)
- .put("DATETIME", Type.TYPE_INT64)
- .put("TIMESTAMP", Type.TYPE_INT64)
- .put("JSON", Type.TYPE_STRING)
+ static final Map<TableFieldSchema.Type, Type> PRIMITIVE_TYPES =
+ ImmutableMap.<TableFieldSchema.Type, Type>builder()
+ .put(TableFieldSchema.Type.INT64, Type.TYPE_INT64)
+ .put(TableFieldSchema.Type.DOUBLE, Type.TYPE_DOUBLE)
+ .put(TableFieldSchema.Type.STRING, Type.TYPE_STRING)
+ .put(TableFieldSchema.Type.BOOL, Type.TYPE_BOOL)
+ .put(TableFieldSchema.Type.BYTES, Type.TYPE_BYTES)
+ .put(TableFieldSchema.Type.NUMERIC, Type.TYPE_BYTES)
+ .put(TableFieldSchema.Type.BIGNUMERIC, Type.TYPE_BYTES)
+ .put(TableFieldSchema.Type.GEOGRAPHY, Type.TYPE_STRING) // Pass through the JSON encoding.
+ .put(TableFieldSchema.Type.DATE, Type.TYPE_INT32)
+ .put(TableFieldSchema.Type.TIME, Type.TYPE_INT64)
+ .put(TableFieldSchema.Type.DATETIME, Type.TYPE_INT64)
+ .put(TableFieldSchema.Type.TIMESTAMP, Type.TYPE_INT64)
+ .put(TableFieldSchema.Type.JSON, Type.TYPE_STRING)
.build();
+ public static Descriptor getDescriptorFromTableSchema(
+ com.google.api.services.bigquery.model.TableSchema jsonSchema, boolean respectRequired)
+ throws DescriptorValidationException {
+ return getDescriptorFromTableSchema(schemaToProtoTableSchema(jsonSchema), respectRequired);
+ }
+
/**
* Given a BigQuery TableSchema, returns a protocol-buffer Descriptor that can be used to write
* data using the BigQuery Storage API.
*/
- public static Descriptor getDescriptorFromTableSchema(TableSchema jsonSchema)
- throws DescriptorValidationException {
- DescriptorProto descriptorProto = descriptorSchemaFromTableSchema(jsonSchema);
+ public static Descriptor getDescriptorFromTableSchema(
+ TableSchema tableSchema, boolean respectRequired) throws DescriptorValidationException {
+ DescriptorProto descriptorProto = descriptorSchemaFromTableSchema(tableSchema, respectRequired);
FileDescriptorProto fileDescriptorProto =
FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
FileDescriptor fileDescriptor =
@@ -218,7 +389,7 @@ public class TableRowToStorageApiProto {
boolean ignoreUnknownValues)
throws SchemaConversionException {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
- for (Map.Entry<String, Object> entry : map.entrySet()) {
+ for (final Map.Entry<String, Object> entry : map.entrySet()) {
@Nullable
FieldDescriptor fieldDescriptor = descriptor.findFieldByName(entry.getKey().toLowerCase());
if (fieldDescriptor == null) {
@@ -267,7 +438,7 @@ public class TableRowToStorageApiProto {
SchemaInformation schemaInformation,
Descriptor descriptor,
TableRow tableRow,
- boolean ignoreUnkownValues)
+ boolean ignoreUnknownValues)
throws SchemaConversionException {
@Nullable Object fValue = tableRow.get("f");
if (fValue instanceof List) {
@@ -275,13 +446,15 @@ public class TableRowToStorageApiProto {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
int cellsToProcess = cells.size();
if (cells.size() > descriptor.getFields().size()) {
- if (ignoreUnkownValues) {
+ if (ignoreUnknownValues) {
cellsToProcess = descriptor.getFields().size();
} else {
throw new SchemaTooNarrowException(
- "TableRow contained too many fields and ignoreUnknownValues not set.");
+ "TableRow contained too many fields and ignoreUnknownValues not set in "
+ + schemaInformation.getName());
}
}
+
for (int i = 0; i < cellsToProcess; ++i) {
AbstractMap<String, Object> cell = cells.get(i);
FieldDescriptor fieldDescriptor = descriptor.getFields().get(i);
@@ -290,7 +463,7 @@ public class TableRowToStorageApiProto {
@Nullable
Object value =
messageValueFromFieldValue(
- fieldSchemaInformation, fieldDescriptor, cell.get("v"), ignoreUnkownValues);
+ fieldSchemaInformation, fieldDescriptor, cell.get("v"), ignoreUnknownValues);
if (value != null) {
builder.setField(fieldDescriptor, value);
}
@@ -311,36 +484,46 @@ public class TableRowToStorageApiProto {
"Could convert schema for " + schemaInformation.getFullName(), e);
}
} else {
- return messageFromMap(schemaInformation, descriptor, tableRow, ignoreUnkownValues);
+ return messageFromMap(schemaInformation, descriptor, tableRow, ignoreUnknownValues);
}
}
@VisibleForTesting
- static DescriptorProto descriptorSchemaFromTableSchema(TableSchema tableSchema) {
- return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
+ static DescriptorProto descriptorSchemaFromTableSchema(
+ com.google.api.services.bigquery.model.TableSchema tableSchema, boolean respectRequired) {
+ return descriptorSchemaFromTableSchema(schemaToProtoTableSchema(tableSchema), respectRequired);
+ }
+
+ @VisibleForTesting
+ static DescriptorProto descriptorSchemaFromTableSchema(
+ TableSchema tableSchema, boolean respectRequired) {
+ return descriptorSchemaFromTableFieldSchemas(tableSchema.getFieldsList(), respectRequired);
}
private static DescriptorProto descriptorSchemaFromTableFieldSchemas(
- Iterable<TableFieldSchema> tableFieldSchemas) {
+ Iterable<TableFieldSchema> tableFieldSchemas, boolean respectRequired) {
DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
// Create a unique name for the descriptor ('-' characters cannot be used).
descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
int i = 1;
for (TableFieldSchema fieldSchema : tableFieldSchemas) {
- fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder);
+ fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder, respectRequired);
}
return descriptorBuilder.build();
}
private static void fieldDescriptorFromTableField(
- TableFieldSchema fieldSchema, int fieldNumber, DescriptorProto.Builder descriptorBuilder) {
+ TableFieldSchema fieldSchema,
+ int fieldNumber,
+ DescriptorProto.Builder descriptorBuilder,
+ boolean respectRequired) {
FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
fieldDescriptorBuilder = fieldDescriptorBuilder.setName(fieldSchema.getName().toLowerCase());
fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
switch (fieldSchema.getType()) {
- case "STRUCT":
- case "RECORD":
- DescriptorProto nested = descriptorSchemaFromTableFieldSchemas(fieldSchema.getFields());
+ case STRUCT:
+ DescriptorProto nested =
+ descriptorSchemaFromTableFieldSchemas(fieldSchema.getFieldsList(), respectRequired);
descriptorBuilder.addNestedType(nested);
fieldDescriptorBuilder =
fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
@@ -354,12 +537,11 @@ public class TableRowToStorageApiProto {
fieldDescriptorBuilder = fieldDescriptorBuilder.setType(type);
}
- Optional<Mode> fieldMode = Optional.ofNullable(fieldSchema.getMode()).map(Mode::valueOf);
- if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
+ if (fieldSchema.getMode() == TableFieldSchema.Mode.REPEATED) {
fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
- } else if (!fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent()) {
+ } else if (!respectRequired || fieldSchema.getMode() == TableFieldSchema.Mode.NULLABLE) {
fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
- } else {
+ } else if (fieldSchema.getMode() == TableFieldSchema.Mode.REQUIRED) {
fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
}
descriptorBuilder.addField(fieldDescriptorBuilder.build());
@@ -377,6 +559,7 @@ public class TableRowToStorageApiProto {
} else if (fieldDescriptor.isRepeated()) {
return Collections.emptyList();
} else {
+ // TODO: Allow expanding this!
throw new SchemaDoesntMatchException(
"Received null value for non-nullable field " + schemaInformation.getFullName());
}
@@ -405,31 +588,28 @@ public class TableRowToStorageApiProto {
boolean ignoreUnknownValues)
throws SchemaConversionException {
switch (schemaInformation.getType()) {
- case "INT64":
- case "INTEGER":
+ case INT64:
if (value instanceof String) {
return Long.valueOf((String) value);
} else if (value instanceof Integer || value instanceof Long) {
return ((Number) value).longValue();
}
break;
- case "FLOAT64":
- case "FLOAT":
+ case DOUBLE:
if (value instanceof String) {
return Double.valueOf((String) value);
} else if (value instanceof Number) {
return ((Number) value).doubleValue();
}
break;
- case "BOOLEAN":
- case "BOOL":
+ case BOOL:
if (value instanceof String) {
return Boolean.valueOf((String) value);
} else if (value instanceof Boolean) {
return value;
}
break;
- case "BYTES":
+ case BYTES:
if (value instanceof String) {
return ByteString.copyFrom(BaseEncoding.base64().decode((String) value));
} else if (value instanceof byte[]) {
@@ -438,7 +618,7 @@ public class TableRowToStorageApiProto {
return value;
}
break;
- case "TIMESTAMP":
+ case TIMESTAMP:
if (value instanceof String) {
try {
// '2011-12-03T10:15:30+01:00' '2011-12-03T10:15:30'
@@ -470,7 +650,7 @@ public class TableRowToStorageApiProto {
.longValue();
}
break;
- case "DATE":
+ case DATE:
if (value instanceof String) {
return ((Long) LocalDate.parse((String) value).toEpochDay()).intValue();
} else if (value instanceof LocalDate) {
@@ -484,7 +664,7 @@ public class TableRowToStorageApiProto {
return ((Number) value).intValue();
}
break;
- case "NUMERIC":
+ case NUMERIC:
if (value instanceof String) {
return BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) value));
@@ -495,7 +675,7 @@ public class TableRowToStorageApiProto {
BigDecimal.valueOf(((Number) value).doubleValue()));
}
break;
- case "BIGNUMERIC":
+ case BIGNUMERIC:
if (value instanceof String) {
return BigDecimalByteStringEncoder.encodeToBigNumericByteString(
new BigDecimal((String) value));
@@ -506,7 +686,7 @@ public class TableRowToStorageApiProto {
BigDecimal.valueOf(((Number) value).doubleValue()));
}
break;
- case "DATETIME":
+ case DATETIME:
if (value instanceof String) {
try {
// '2011-12-03T10:15:30'
@@ -525,7 +705,7 @@ public class TableRowToStorageApiProto {
return CivilTimeEncoder.encodePacked64DatetimeMicros((org.joda.time.LocalDateTime) value);
}
break;
- case "TIME":
+ case TIME:
if (value instanceof String) {
return CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) value));
} else if (value instanceof Number) {
@@ -536,12 +716,11 @@ public class TableRowToStorageApiProto {
return CivilTimeEncoder.encodePacked64TimeMicros((org.joda.time.LocalTime) value);
}
break;
- case "STRING":
- case "JSON":
- case "GEOGRAPHY":
+ case STRING:
+ case JSON:
+ case GEOGRAPHY:
return Preconditions.checkArgumentNotNull(value).toString();
- case "STRUCT":
- case "RECORD":
+ case STRUCT:
if (value instanceof TableRow) {
TableRow tableRow = (TableRow) value;
return messageFromTableRow(
@@ -553,6 +732,8 @@ public class TableRowToStorageApiProto {
schemaInformation, fieldDescriptor.getMessageType(), map, ignoreUnknownValues);
}
break;
+ default:
+ throw new RuntimeException("Unknown type " + schemaInformation.getType());
}
throw new SchemaDoesntMatchException(
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
index 854058a81f3..fefe4d29a2e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
@@ -85,21 +85,21 @@ public class BeamRowToStorageApiProtoTest {
FieldDescriptorProto.newBuilder()
.setName("bytevalue")
.setNumber(1)
- .setType(Type.TYPE_INT32)
+ .setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("int16value")
.setNumber(2)
- .setType(Type.TYPE_INT32)
+ .setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_REQUIRED)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("int32value")
.setNumber(3)
- .setType(Type.TYPE_INT32)
+ .setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
@@ -120,7 +120,7 @@ public class BeamRowToStorageApiProtoTest {
FieldDescriptorProto.newBuilder()
.setName("floatvalue")
.setNumber(6)
- .setType(Type.TYPE_FLOAT)
+ .setType(Type.TYPE_DOUBLE)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
@@ -233,14 +233,14 @@ public class BeamRowToStorageApiProtoTest {
.build();
private static final Map<String, Object> BASE_PROTO_EXPECTED_FIELDS =
ImmutableMap.<String, Object>builder()
- .put("bytevalue", (int) 1)
- .put("int16value", (int) 2)
- .put("int32value", (int) 3)
- .put("int64value", (long) 4)
+ .put("bytevalue", 1L)
+ .put("int16value", 2L)
+ .put("int32value", 3L)
+ .put("int64value", 4L)
.put(
"decimalvalue",
BeamRowToStorageApiProto.serializeBigDecimalToNumeric(BigDecimal.valueOf(5)))
- .put("floatvalue", (float) 3.14)
+ .put("floatvalue", (double) 3.14)
.put("doublevalue", (double) 2.68)
.put("stringvalue", "I am a string. Hear me roar.")
.put("datetimevalue", BASE_ROW.getDateTime("datetimeValue").getMillis() * 1000)
@@ -284,7 +284,8 @@ public class BeamRowToStorageApiProtoTest {
@Test
public void testDescriptorFromSchema() {
DescriptorProto descriptor =
- BeamRowToStorageApiProto.descriptorSchemaFromBeamSchema(BASE_SCHEMA);
+ TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+ BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(BASE_SCHEMA), true);
Map<String, Type> types =
descriptor.getFieldList().stream()
.collect(
@@ -315,7 +316,8 @@ public class BeamRowToStorageApiProtoTest {
@Test
public void testNestedFromSchema() {
DescriptorProto descriptor =
- BeamRowToStorageApiProto.descriptorSchemaFromBeamSchema(NESTED_SCHEMA);
+ TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+ BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema((NESTED_SCHEMA)), true);
Map<String, Type> expectedBaseTypes =
BASE_SCHEMA_PROTO.getFieldList().stream()
.collect(
@@ -378,7 +380,9 @@ public class BeamRowToStorageApiProtoTest {
@Test
public void testMessageFromTableRow() throws Exception {
- Descriptor descriptor = BeamRowToStorageApiProto.getDescriptorFromSchema(NESTED_SCHEMA);
+ Descriptor descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(
+ BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(NESTED_SCHEMA), true);
DynamicMessage msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, NESTED_ROW);
assertEquals(3, msg.getAllFields().size());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 1e1749e8569..9cacfde54ad 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.ErrorProto;
@@ -65,11 +66,9 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
-import java.util.function.LongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.LongStream;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -199,7 +198,6 @@ public class BigQueryIOWriteTest implements Serializable {
public void evaluate() throws Throwable {
options = TestPipeline.testingPipelineOptions();
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- bqOptions.setSchemaUpdateRetries(Integer.MAX_VALUE);
bqOptions.setProject("project-id");
if (description.getAnnotations().stream()
.anyMatch(a -> a.annotationType().equals(ProjectOverride.class))) {
@@ -254,9 +252,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteEmptyPCollection() throws Exception {
- if (useStreaming || useStorageApi) {
- return;
- }
+ assumeTrue(!useStreaming);
+ assumeTrue(!useStorageApi);
TableSchema schema =
new TableSchema()
.setFields(
@@ -290,12 +287,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteDynamicDestinationsStreamingWithAutoSharding() throws Exception {
- if (useStorageApi) {
- return;
- }
- if (!useStreaming) {
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(useStreaming);
writeDynamicDestinations(true, true);
}
@@ -586,9 +579,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testTriggeredFileLoads() throws Exception {
- if (useStorageApi || !useStreaming) {
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(useStreaming);
List<TableRow> elements = Lists.newArrayList();
for (int i = 0; i < 30; ++i) {
elements.add(new TableRow().set("number", i));
@@ -677,9 +669,8 @@ public class BigQueryIOWriteTest implements Serializable {
}
public void testTriggeredFileLoadsWithTempTables(String tableRef) throws Exception {
- if (useStorageApi || !useStreaming) {
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(useStreaming);
List<TableRow> elements = Lists.newArrayList();
for (int i = 0; i < 30; ++i) {
elements.add(new TableRow().set("number", i));
@@ -739,9 +730,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testUntriggeredFileLoadsWithTempTables() throws Exception {
// Test only non-streaming inserts.
- if (useStorageApi || useStreaming) {
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(!useStreaming);
List<TableRow> elements = Lists.newArrayList();
for (int i = 0; i < 30; ++i) {
elements.add(new TableRow().set("number", i));
@@ -773,10 +763,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testTriggeredFileLoadsWithAutoSharding() throws Exception {
- if (useStorageApi || !useStreaming) {
- // This test does not make sense for the storage API.
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(useStreaming);
List<TableRow> elements = Lists.newArrayList();
for (int i = 0; i < 30; ++i) {
elements.add(new TableRow().set("number", i));
@@ -845,9 +833,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testFailuresNoRetryPolicy() throws Exception {
- if (useStorageApi || !useStreaming) {
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(useStreaming);
TableRow row1 = new TableRow().set("name", "a").set("number", "1");
TableRow row2 = new TableRow().set("name", "b").set("number", "2");
TableRow row3 = new TableRow().set("name", "c").set("number", "3");
@@ -884,9 +871,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testRetryPolicy() throws Exception {
- if (useStorageApi || !useStreaming) {
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(useStreaming);
TableRow row1 = new TableRow().set("name", "a").set("number", "1");
TableRow row2 = new TableRow().set("name", "b").set("number", "2");
TableRow row3 = new TableRow().set("name", "c").set("number", "3");
@@ -960,9 +946,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteWithSuccessfulBatchInserts() throws Exception {
- if (useStreaming || useStorageApi) {
- return;
- }
+ assumeTrue(!useStreaming);
+ assumeTrue(!useStorageApi);
WriteResult result =
p.apply(
@@ -992,9 +977,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteWithSuccessfulBatchInsertsAndWriteRename() throws Exception {
- if (useStreaming || useStorageApi) {
- return;
- }
+ assumeTrue(!useStreaming);
+ assumeTrue(!useStorageApi);
WriteResult result =
p.apply(
@@ -1026,9 +1010,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteWithoutInsertId() throws Exception {
- if (useStorageApi || !useStreaming) {
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(useStreaming);
TableRow row1 = new TableRow().set("name", "a").set("number", 1);
TableRow row2 = new TableRow().set("name", "b").set("number", 2);
TableRow row3 = new TableRow().set("name", "c").set("number", 3);
@@ -1079,9 +1062,9 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteAvro() throws Exception {
- if (useStorageApi || useStreaming) {
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(!useStreaming);
+
p.apply(
Create.of(
InputRecord.create("test", 1, 1.0, Instant.parse("2019-01-01T00:00:00Z")),
@@ -1130,9 +1113,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteAvroWithCustomWriter() throws Exception {
- if (useStorageApi || useStreaming) {
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(!useStreaming);
SerializableFunction<AvroWriteRequest<InputRecord>, GenericRecord> formatFunction =
r -> {
GenericRecord rec = new GenericData.Record(r.getSchema());
@@ -1246,9 +1228,7 @@ public class BigQueryIOWriteTest implements Serializable {
}
private void storageWrite(boolean autoSharding) throws Exception {
- if (!useStorageApi) {
- return;
- }
+ assumeTrue(useStorageApi);
BigQueryIO.Write<TableRow> write =
BigQueryIO.writeTableRows()
.to("project-id:dataset-id.table-id")
@@ -1302,6 +1282,7 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testSchemaWriteLoads() throws Exception {
+ assumeTrue(!useStreaming);
// withMethod overrides the pipeline option, so we need to explicitly request
// STORAGE_API_WRITES.
BigQueryIO.Write.Method method =
@@ -1329,17 +1310,17 @@ public class BigQueryIOWriteTest implements Serializable {
assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
containsInAnyOrder(
- new TableRow().set("name", "a").set("number", 1),
- new TableRow().set("name", "b").set("number", 2),
- new TableRow().set("name", "c").set("number", 3),
- new TableRow().set("name", "d").set("number", 4)));
+ new TableRow().set("name", "a").set("number", "1"),
+ new TableRow().set("name", "b").set("number", "2"),
+ new TableRow().set("name", "c").set("number", "3"),
+ new TableRow().set("name", "d").set("number", "4")));
}
@Test
public void testSchemaWriteStreams() throws Exception {
- if (useStorageApi || !useStreaming) {
- return;
- }
+ assumeTrue(!useStorageApi);
+ assumeTrue(useStreaming);
+
WriteResult result =
p.apply(
Create.of(
@@ -1358,22 +1339,20 @@ public class BigQueryIOWriteTest implements Serializable {
PAssert.that(result.getSuccessfulInserts())
.satisfies(
- new SerializableFunction<Iterable<TableRow>, Void>() {
- @Override
- public Void apply(Iterable<TableRow> input) {
- assertThat(Lists.newArrayList(input).size(), is(4));
- return null;
- }
- });
+ (SerializableFunction<Iterable<TableRow>, Void>)
+ input -> {
+ assertThat(Lists.newArrayList(input).size(), is(4));
+ return null;
+ });
p.run();
assertThat(
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
containsInAnyOrder(
- new TableRow().set("name", "a").set("number", 1),
- new TableRow().set("name", "b").set("number", 2),
- new TableRow().set("name", "c").set("number", 3),
- new TableRow().set("name", "d").set("number", 4)));
+ new TableRow().set("name", "a").set("number", "1"),
+ new TableRow().set("name", "b").set("number", "2"),
+ new TableRow().set("name", "c").set("number", "3"),
+ new TableRow().set("name", "d").set("number", "4")));
}
/**
@@ -1572,9 +1551,7 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteUnknown() throws Exception {
- if (useStorageApi) {
- return;
- }
+ assumeTrue(!useStorageApi);
p.apply(
Create.of(
new TableRow().set("name", "a").set("number", 1),
@@ -1595,9 +1572,7 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteFailedJobs() throws Exception {
- if (useStorageApi) {
- return;
- }
+ assumeTrue(!useStorageApi);
p.apply(
Create.of(
new TableRow().set("name", "a").set("number", 1),
@@ -1819,120 +1794,6 @@ public class BigQueryIOWriteTest implements Serializable {
p.run();
}
- @Test
- public void testUpdateTableSchemaUseSet() throws Exception {
- updateTableSchemaTest(true);
- }
-
- @Test
- public void testUpdateTableSchemaUseSetF() throws Exception {
- updateTableSchemaTest(false);
- }
-
- public void updateTableSchemaTest(boolean useSet) throws Exception {
- if (!useStreaming || !useStorageApi) {
- return;
- }
- BigQueryIO.Write.Method method =
- useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API;
- p.enableAbandonedNodeEnforcement(false);
-
- TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset-id.table");
- TableSchema tableSchema =
- new TableSchema()
- .setFields(
- ImmutableList.of(
- new TableFieldSchema().setName("name").setType("STRING"),
- new TableFieldSchema().setName("number").setType("INTEGER")));
- fakeDatasetService.createTable(new Table().setTableReference(tableRef).setSchema(tableSchema));
-
- LongFunction<TableRow> getRowSet =
- (LongFunction<TableRow> & Serializable)
- (long i) -> {
- if (i < 5) {
- return new TableRow().set("name", "name" + i).set("number", Long.toString(i));
-
- } else {
- return new TableRow()
- .set("name", "name" + i)
- .set("number", Long.toString(i))
- .set("double_number", Long.toString(i * 2));
- }
- };
-
- LongFunction<TableRow> getRowSetF =
- (LongFunction<TableRow> & Serializable)
- (long i) -> {
- if (i < 5) {
- return new TableRow()
- .setF(
- ImmutableList.of(
- new TableCell().setV("name" + i),
- new TableCell().setV(Long.toString(i))));
- } else {
- return new TableRow()
- .setF(
- ImmutableList.of(
- new TableCell().setV("name" + i),
- new TableCell().setV(Long.toString(i)),
- new TableCell().setV(Long.toString(i * 2))));
- }
- };
-
- LongFunction<TableRow> getRow = useSet ? getRowSet : getRowSetF;
- final int numRows = 10;
- PCollection<TableRow> tableRows =
- p.apply(GenerateSequence.from(0).to(numRows))
- .apply(
- MapElements.via(
- new SimpleFunction<Long, TableRow>() {
- @Override
- public TableRow apply(Long input) {
- return getRow.apply(input);
- }
- }))
- .setCoder(TableRowJsonCoder.of());
- tableRows.apply(
- BigQueryIO.writeTableRows()
- .to(tableRef)
- .withMethod(method)
- .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
- .withAutoSchemaUpdate(true)
- .withTestServices(fakeBqServices)
- .withoutValidation());
-
- Thread thread =
- new Thread(
- () -> {
- try {
- Thread.sleep(5000);
- TableSchema tableSchemaUpdated =
- new TableSchema()
- .setFields(
- ImmutableList.of(
- new TableFieldSchema().setName("name").setType("STRING"),
- new TableFieldSchema().setName("number").setType("INTEGER"),
- new TableFieldSchema()
- .setName("double_number")
- .setType("INTEGER")));
- fakeDatasetService.updateTableSchema(tableRef, tableSchemaUpdated);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
- thread.start();
- p.run();
- thread.join();
-
- assertThat(
- fakeDatasetService.getAllRows(
- tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
- containsInAnyOrder(
- Iterables.toArray(
- LongStream.range(0, numRows).mapToObj(getRowSet).collect(Collectors.toList()),
- TableRow.class)));
- }
-
@Test
public void testBigQueryIOGetName() {
assertEquals(
@@ -1969,9 +1830,7 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteValidateFailsBothFormatFunctions() {
- if (useStorageApi) {
- return;
- }
+ assumeTrue(!useStorageApi);
p.enableAbandonedNodeEnforcement(false);
thrown.expect(IllegalArgumentException.class);
@@ -1990,9 +1849,7 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteValidateFailsWithBeamSchemaAndAvroFormatFunction() {
- if (useStorageApi) {
- return;
- }
+ assumeTrue(!useStorageApi);
p.enableAbandonedNodeEnforcement(false);
thrown.expect(IllegalArgumentException.class);
@@ -2008,9 +1865,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteValidateFailsWithAvroFormatAndStreamingInserts() {
- if (!useStreaming && !useStorageApi) {
- return;
- }
+ assumeTrue(useStreaming);
+ assumeTrue(!useStorageApi);
p.enableAbandonedNodeEnforcement(false);
thrown.expect(IllegalArgumentException.class);
@@ -2027,9 +1883,7 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWriteValidateFailsWithBatchAutoSharding() {
- if (useStorageApi) {
- return;
- }
+ assumeTrue(!useStorageApi);
p.enableAbandonedNodeEnforcement(false);
thrown.expect(IllegalArgumentException.class);
@@ -2497,9 +2351,7 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testExtendedErrorRetrieval() throws Exception {
- if (useStorageApi) {
- return;
- }
+ assumeTrue(!useStorageApi);
TableRow row1 = new TableRow().set("name", "a").set("number", "1");
TableRow row2 = new TableRow().set("name", "b").set("number", "2");
TableRow row3 = new TableRow().set("name", "c").set("number", "3");
@@ -2552,13 +2404,7 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testStorageApiErrors() throws Exception {
- BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
- bqOptions.setSchemaUpdateRetries(1);
-
- if (!useStorageApi) {
- return;
- }
-
+ assumeTrue(useStorageApi);
final Method method =
useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API;
@@ -2657,9 +2503,7 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testWrongErrorConfigs() {
- if (useStorageApi) {
- return;
- }
+ assumeTrue(!useStorageApi);
p.enableAutoRunIfMissing(true);
TableRow row1 = new TableRow().set("name", "a").set("number", "1");
@@ -2777,9 +2621,8 @@ public class BigQueryIOWriteTest implements Serializable {
@Test
public void testSchemaUpdateOptionsFailsStreamingInserts() throws Exception {
- if (!useStreaming && !useStorageApi) {
- return;
- }
+ assumeTrue(useStreaming);
+ assumeTrue(!useStorageApi);
Set<SchemaUpdateOption> options = EnumSet.of(SchemaUpdateOption.ALLOW_FIELD_ADDITION);
p.enableAbandonedNodeEnforcement(false);
thrown.expect(IllegalArgumentException.class);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index b832a9b3612..eacb95a9a68 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -599,7 +599,7 @@ public class BigQueryUtilsTest {
assertThat(row.size(), equalTo(22));
assertThat(row, hasEntry("id", "123"));
- assertThat(row, hasEntry("value", 123.456));
+ assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
assertThat(row, hasEntry("datetime0ms", "2020-11-02T12:34:56"));
assertThat(row, hasEntry("datetime0s_ns", "2020-11-02T12:34:00.789876"));
@@ -610,12 +610,12 @@ public class BigQueryUtilsTest {
assertThat(row, hasEntry("time0s_ns", "12:34:00.789876"));
assertThat(row, hasEntry("time0s_0ns", "12:34:00"));
assertThat(row, hasEntry("name", "test"));
- assertThat(row, hasEntry("valid", false));
+ assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
- assertThat(row, hasEntry("boolean", true));
+ assertThat(row, hasEntry("boolean", "true"));
assertThat(row, hasEntry("long", "123"));
- assertThat(row, hasEntry("double", 123.456));
+ assertThat(row, hasEntry("double", "123.456"));
}
@Test
@@ -642,7 +642,7 @@ public class BigQueryUtilsTest {
row = ((List<TableRow>) row.get("map")).get(0);
assertThat(row.size(), equalTo(2));
assertThat(row, hasEntry("key", "test"));
- assertThat(row, hasEntry("value", 123.456));
+ assertThat(row, hasEntry("value", "123.456"));
}
@Test
@@ -653,7 +653,7 @@ public class BigQueryUtilsTest {
row = (TableRow) row.get("row");
assertThat(row.size(), equalTo(22));
assertThat(row, hasEntry("id", "123"));
- assertThat(row, hasEntry("value", 123.456));
+ assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
assertThat(row, hasEntry("datetime0ms", "2020-11-02T12:34:56"));
assertThat(row, hasEntry("datetime0s_ns", "2020-11-02T12:34:00.789876"));
@@ -664,12 +664,12 @@ public class BigQueryUtilsTest {
assertThat(row, hasEntry("time0s_ns", "12:34:00.789876"));
assertThat(row, hasEntry("time0s_0ns", "12:34:00"));
assertThat(row, hasEntry("name", "test"));
- assertThat(row, hasEntry("valid", false));
+ assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
- assertThat(row, hasEntry("boolean", true));
+ assertThat(row, hasEntry("boolean", "true"));
assertThat(row, hasEntry("long", "123"));
- assertThat(row, hasEntry("double", 123.456));
+ assertThat(row, hasEntry("double", "123.456"));
}
@Test
@@ -680,7 +680,7 @@ public class BigQueryUtilsTest {
row = ((List<TableRow>) row.get("rows")).get(0);
assertThat(row.size(), equalTo(22));
assertThat(row, hasEntry("id", "123"));
- assertThat(row, hasEntry("value", 123.456));
+ assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
assertThat(row, hasEntry("datetime0ms", "2020-11-02T12:34:56"));
assertThat(row, hasEntry("datetime0s_ns", "2020-11-02T12:34:00.789876"));
@@ -691,12 +691,12 @@ public class BigQueryUtilsTest {
assertThat(row, hasEntry("time0s_ns", "12:34:00.789876"));
assertThat(row, hasEntry("time0s_0ns", "12:34:00"));
assertThat(row, hasEntry("name", "test"));
- assertThat(row, hasEntry("valid", false));
+ assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
assertThat(row, hasEntry("numeric", "123.456"));
- assertThat(row, hasEntry("boolean", true));
+ assertThat(row, hasEntry("boolean", "true"));
assertThat(row, hasEntry("long", "123"));
- assertThat(row, hasEntry("double", 123.456));
+ assertThat(row, hasEntry("double", "123.456"));
}
@Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 0a376c8082d..27a772a60ed 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -44,7 +44,9 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -467,10 +469,12 @@ public class TableRowToStorageApiProtoTest {
.setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
.build());
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
@Test
public void testDescriptorFromTableSchema() {
DescriptorProto descriptor =
- TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+ TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA, true);
Map<String, Type> types =
descriptor.getFieldList().stream()
.collect(
@@ -485,7 +489,7 @@ public class TableRowToStorageApiProtoTest {
@Test
public void testNestedFromTableSchema() {
DescriptorProto descriptor =
- TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA);
+ TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA, true);
Map<String, Type> expectedBaseTypes =
BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
.collect(
@@ -687,7 +691,7 @@ public class TableRowToStorageApiProtoTest {
.set("nestedValueNoF2", BASE_TABLE_ROW_NO_F);
Descriptor descriptor =
- TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA);
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA, true);
TableRowToStorageApiProto.SchemaInformation schemaInformation =
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA);
DynamicMessage msg =
@@ -707,7 +711,7 @@ public class TableRowToStorageApiProtoTest {
@Test
public void testMessageWithFFromTableRow() throws Exception {
Descriptor descriptor =
- TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA);
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA, true);
TableRowToStorageApiProto.SchemaInformation schemaInformation =
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA);
DynamicMessage msg =
@@ -750,7 +754,7 @@ public class TableRowToStorageApiProtoTest {
.set("repeatednof1", ImmutableList.of(BASE_TABLE_ROW_NO_F, BASE_TABLE_ROW_NO_F))
.set("repeatednof2", ImmutableList.of(BASE_TABLE_ROW_NO_F, BASE_TABLE_ROW_NO_F));
Descriptor descriptor =
- TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA);
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA, true);
TableRowToStorageApiProto.SchemaInformation schemaInformation =
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA);
DynamicMessage msg =
@@ -795,7 +799,7 @@ public class TableRowToStorageApiProtoTest {
.set("repeatednof1", null)
.set("repeatednof2", null);
Descriptor descriptor =
- TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA);
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA, true);
TableRowToStorageApiProto.SchemaInformation schemaInformation =
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA);
DynamicMessage msg =
@@ -818,4 +822,70 @@ public class TableRowToStorageApiProtoTest {
(List<DynamicMessage>) msg.getField(fieldDescriptors.get("repeatednof2"));
assertTrue(repeatednof2.isEmpty());
}
+
+ @Test
+ public void testRejectUnknownField() throws Exception {
+ TableRow row = new TableRow();
+ row.putAll(BASE_TABLE_ROW_NO_F);
+ row.set("unknown", "foobar");
+
+ Descriptor descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA_NO_F, true);
+ TableRowToStorageApiProto.SchemaInformation schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA_NO_F);
+
+ thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class);
+ TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, row, false);
+ }
+
+ @Test
+ public void testRejectUnknownFieldF() throws Exception {
+ TableRow row = new TableRow();
+ List<TableCell> cells = Lists.newArrayList(BASE_TABLE_ROW.getF());
+ cells.add(new TableCell().setV("foobar"));
+ row.setF(cells);
+
+ Descriptor descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA, true);
+ TableRowToStorageApiProto.SchemaInformation schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA);
+
+ thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class);
+ TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, row, false);
+ }
+
+ @Test
+ public void testRejectUnknownNestedField() throws Exception {
+ TableRow rowNoF = new TableRow();
+ rowNoF.putAll(BASE_TABLE_ROW_NO_F);
+ rowNoF.set("unknown", "foobar");
+
+ TableRow topRow = new TableRow().set("nestedValueNoF1", rowNoF);
+
+ Descriptor descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA, true);
+ TableRowToStorageApiProto.SchemaInformation schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA);
+
+ thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class);
+ TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, topRow, false);
+ }
+
+ @Test
+ public void testRejectUnknownNestedFieldF() throws Exception {
+ TableRow rowWithF = new TableRow();
+ List<TableCell> cells = Lists.newArrayList(BASE_TABLE_ROW.getF());
+ cells.add(new TableCell().setV("foobar"));
+ rowWithF.setF(cells);
+
+ TableRow topRow = new TableRow().set("nestedValue1", rowWithF);
+
+ Descriptor descriptor =
+ TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA, true);
+ TableRowToStorageApiProto.SchemaInformation schemaInformation =
+ TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA);
+
+ thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class);
+ TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, topRow, false);
+ }
}