You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/11/29 04:01:04 UTC

[beam] branch master updated: Merge pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bb13fa35b9 Merge pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395
5bb13fa35b9 is described below

commit 5bb13fa35b9bc36764895c57f23d3890f0f1b567
Author: Reuven Lax <re...@google.com>
AuthorDate: Mon Nov 28 20:00:52 2022 -0800

    Merge pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395
---
 .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java |  68 +++++
 .../io/gcp/bigquery/BeamRowToStorageApiProto.java  | 144 ++++-----
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |  12 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryOptions.java  |   6 -
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java    |   2 +-
 .../sdk/io/gcp/bigquery/SplittingIterable.java     |  31 +-
 .../bigquery/StorageApiDynamicDestinations.java    |  21 +-
 .../StorageApiDynamicDestinationsBeamRow.java      |  50 ++-
 .../StorageApiDynamicDestinationsTableRow.java     | 201 +++++-------
 .../beam/sdk/io/gcp/bigquery/StorageApiLoads.java  |  12 +-
 .../io/gcp/bigquery/StorageApiWritePayload.java    |   5 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 155 +++++-----
 .../bigquery/StorageApiWritesShardedRecords.java   | 214 +++++++------
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 337 ++++++++++++++++-----
 .../gcp/bigquery/BeamRowToStorageApiProtoTest.java |  28 +-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 273 ++++-------------
 .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java     |  26 +-
 .../bigquery/TableRowToStorageApiProtoTest.java    |  82 ++++-
 18 files changed, 849 insertions(+), 818 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
new file mode 100644
index 00000000000..1680ef48e4d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.Descriptors;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+
+/**
+ * Container class used by {@link StorageApiWritesShardedRecords} and {@link
+ * StorageApiWritesShardedRecords} to enapsulate a destination {@link TableSchema} along with a
+ * {@link BigQueryServices.StreamAppendClient} and other objects needed to write records.
+ */
+class AppendClientInfo {
+  @Nullable BigQueryServices.StreamAppendClient streamAppendClient;
+  @Nullable TableSchema tableSchema;
+  Consumer<BigQueryServices.StreamAppendClient> closeAppendClient;
+  Descriptors.Descriptor descriptor;
+
+  public AppendClientInfo(
+      TableSchema tableSchema, Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
+      throws Exception {
+    this.tableSchema = tableSchema;
+    this.descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true);
+    this.closeAppendClient = closeAppendClient;
+  }
+
+  public AppendClientInfo clearAppendClient() {
+    if (streamAppendClient != null) {
+      closeAppendClient.accept(streamAppendClient);
+      this.streamAppendClient = null;
+    }
+    return this;
+  }
+
+  public AppendClientInfo createAppendClient(
+      BigQueryServices.DatasetService datasetService,
+      Supplier<String> getStreamName,
+      boolean useConnectionPool)
+      throws Exception {
+    if (streamAppendClient == null) {
+      this.streamAppendClient =
+          datasetService.getStreamAppendClient(getStreamName.get(), descriptor, useConnectionPool);
+    }
+    return this;
+  }
+
+  public void close() {
+    clearAppendClient();
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
index 816cbe9d6ca..4ef88e01c76 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
@@ -17,16 +17,11 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.DescriptorProtos.DescriptorProto;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
-import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
-import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
 import com.google.protobuf.Descriptors.Descriptor;
-import com.google.protobuf.Descriptors.DescriptorValidationException;
 import com.google.protobuf.Descriptors.FieldDescriptor;
-import com.google.protobuf.Descriptors.FileDescriptor;
 import com.google.protobuf.DynamicMessage;
 import java.math.BigDecimal;
 import java.time.LocalDate;
@@ -34,7 +29,6 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -52,8 +46,6 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.Visi
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
 import org.joda.time.ReadableInstant;
 
@@ -71,38 +63,38 @@ public class BeamRowToStorageApiProto {
       new BigDecimal("-99999999999999999999999999999.999999999");
 
   // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
-  static final Map<TypeName, Type> PRIMITIVE_TYPES =
-      ImmutableMap.<TypeName, Type>builder()
-          .put(TypeName.INT16, Type.TYPE_INT32)
-          .put(TypeName.BYTE, Type.TYPE_INT32)
-          .put(TypeName.INT32, Type.TYPE_INT32)
-          .put(TypeName.INT64, Type.TYPE_INT64)
-          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
-          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
-          .put(TypeName.STRING, Type.TYPE_STRING)
-          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
-          .put(TypeName.DATETIME, Type.TYPE_INT64)
-          .put(TypeName.BYTES, Type.TYPE_BYTES)
-          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+  static final Map<TypeName, TableFieldSchema.Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, TableFieldSchema.Type>builder()
+          .put(TypeName.INT16, TableFieldSchema.Type.INT64)
+          .put(TypeName.BYTE, TableFieldSchema.Type.INT64)
+          .put(TypeName.INT32, TableFieldSchema.Type.INT64)
+          .put(TypeName.INT64, TableFieldSchema.Type.INT64)
+          .put(TypeName.FLOAT, TableFieldSchema.Type.DOUBLE)
+          .put(TypeName.DOUBLE, TableFieldSchema.Type.DOUBLE)
+          .put(TypeName.STRING, TableFieldSchema.Type.STRING)
+          .put(TypeName.BOOLEAN, TableFieldSchema.Type.BOOL)
+          .put(TypeName.DATETIME, TableFieldSchema.Type.DATETIME)
+          .put(TypeName.BYTES, TableFieldSchema.Type.BYTES)
+          .put(TypeName.DECIMAL, TableFieldSchema.Type.BIGNUMERIC)
           .build();
 
   // A map of supported logical types to the protobuf field type.
-  static final Map<String, Type> LOGICAL_TYPES =
-      ImmutableMap.<String, Type>builder()
-          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
-          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
-          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
-          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
-          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+  static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
+      ImmutableMap.<String, TableFieldSchema.Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), TableFieldSchema.Type.DATE)
+          .put(SqlTypes.TIME.getIdentifier(), TableFieldSchema.Type.TIME)
+          .put(SqlTypes.DATETIME.getIdentifier(), TableFieldSchema.Type.DATETIME)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), TableFieldSchema.Type.TIMESTAMP)
+          .put(EnumerationType.IDENTIFIER, TableFieldSchema.Type.STRING)
           .build();
 
   static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
       ImmutableMap.<TypeName, Function<Object, Object>>builder()
-          .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
-          .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
-          .put(TypeName.INT32, Functions.identity())
+          .put(TypeName.INT16, o -> ((Short) o).longValue())
+          .put(TypeName.BYTE, o -> ((Byte) o).longValue())
+          .put(TypeName.INT32, o -> ((Integer) o).longValue())
           .put(TypeName.INT64, Functions.identity())
-          .put(TypeName.FLOAT, Function.identity())
+          .put(TypeName.FLOAT, o -> Double.valueOf(o.toString()))
           .put(TypeName.DOUBLE, Function.identity())
           .put(TypeName.STRING, Function.identity())
           .put(TypeName.BOOLEAN, Function.identity())
@@ -134,21 +126,6 @@ public class BeamRowToStorageApiProto {
                   ((EnumerationType) logicalType).toString((EnumerationType.Value) value))
           .build();
 
-  /**
-   * Given a Beam Schema, returns a protocol-buffer Descriptor that can be used to write data using
-   * the BigQuery Storage API.
-   */
-  public static Descriptor getDescriptorFromSchema(Schema schema)
-      throws DescriptorValidationException {
-    DescriptorProto descriptorProto = descriptorSchemaFromBeamSchema(schema);
-    FileDescriptorProto fileDescriptorProto =
-        FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
-    FileDescriptor fileDescriptor =
-        FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]);
-
-    return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
-  }
-
   /**
    * Given a Beam {@link Row} object, returns a protocol-buffer message that can be used to write
    * data using the BigQuery Storage streaming API.
@@ -159,7 +136,9 @@ public class BeamRowToStorageApiProto {
     for (int i = 0; i < row.getFieldCount(); ++i) {
       Field beamField = beamSchema.getField(i);
       FieldDescriptor fieldDescriptor =
-          Preconditions.checkNotNull(descriptor.findFieldByName(beamField.getName().toLowerCase()));
+          Preconditions.checkNotNull(
+              descriptor.findFieldByName(beamField.getName().toLowerCase()),
+              beamField.getName().toLowerCase());
       @Nullable Object value = messageValueFromRowValue(fieldDescriptor, beamField, i, row);
       if (value != null) {
         builder.setField(fieldDescriptor, value);
@@ -169,27 +148,19 @@ public class BeamRowToStorageApiProto {
   }
 
   @VisibleForTesting
-  static DescriptorProto descriptorSchemaFromBeamSchema(Schema schema) {
+  static TableSchema protoTableSchemaFromBeamSchema(Schema schema) {
     Preconditions.checkState(schema.getFieldCount() > 0);
-    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
-    // Create a unique name for the descriptor ('-' characters cannot be used).
-    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
-    int i = 1;
-    List<DescriptorProto> nestedTypes = Lists.newArrayList();
+
+    TableSchema.Builder builder = TableSchema.newBuilder();
     for (Field field : schema.getFields()) {
-      FieldDescriptorProto.Builder fieldDescriptorProtoBuilder =
-          fieldDescriptorFromBeamField(field, i++, nestedTypes);
-      descriptorBuilder.addField(fieldDescriptorProtoBuilder);
+      builder.addFields(fieldDescriptorFromBeamField(field));
     }
-    nestedTypes.forEach(descriptorBuilder::addNestedType);
-    return descriptorBuilder.build();
+    return builder.build();
   }
 
-  private static FieldDescriptorProto.Builder fieldDescriptorFromBeamField(
-      Field field, int fieldNumber, List<DescriptorProto> nestedTypes) {
-    FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
-    fieldDescriptorBuilder = fieldDescriptorBuilder.setName(field.getName().toLowerCase());
-    fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+  private static TableFieldSchema fieldDescriptorFromBeamField(Field field) {
+    TableFieldSchema.Builder builder = TableFieldSchema.newBuilder();
+    builder = builder.setName(field.getName().toLowerCase());
 
     switch (field.getType().getTypeName()) {
       case ROW:
@@ -197,10 +168,10 @@ public class BeamRowToStorageApiProto {
         if (rowSchema == null) {
           throw new RuntimeException("Unexpected null schema!");
         }
-        DescriptorProto nested = descriptorSchemaFromBeamSchema(rowSchema);
-        nestedTypes.add(nested);
-        fieldDescriptorBuilder =
-            fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+        builder = builder.setType(TableFieldSchema.Type.STRUCT);
+        for (Schema.Field nestedField : rowSchema.getFields()) {
+          builder = builder.addFields(fieldDescriptorFromBeamField(nestedField));
+        }
         break;
       case ARRAY:
       case ITERABLE:
@@ -211,35 +182,44 @@ public class BeamRowToStorageApiProto {
         Preconditions.checkState(
             !Preconditions.checkNotNull(elementType.getTypeName()).isCollectionType(),
             "Nested arrays not supported by BigQuery.");
-        return fieldDescriptorFromBeamField(
-                Field.of(field.getName(), elementType), fieldNumber, nestedTypes)
-            .setLabel(Label.LABEL_REPEATED);
+        TableFieldSchema elementFieldSchema =
+            fieldDescriptorFromBeamField(Field.of(field.getName(), elementType));
+        builder = builder.setType(elementFieldSchema.getType());
+        builder.addAllFields(elementFieldSchema.getFieldsList());
+        builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
+        break;
       case LOGICAL_TYPE:
         @Nullable LogicalType<?, ?> logicalType = field.getType().getLogicalType();
         if (logicalType == null) {
           throw new RuntimeException("Unexpected null logical type " + field.getType());
         }
-        @Nullable Type type = LOGICAL_TYPES.get(logicalType.getIdentifier());
+        @Nullable TableFieldSchema.Type type = LOGICAL_TYPES.get(logicalType.getIdentifier());
         if (type == null) {
           throw new RuntimeException("Unsupported logical type " + field.getType());
         }
-        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(type);
+        builder = builder.setType(type);
         break;
       case MAP:
         throw new RuntimeException("Map types not supported by BigQuery.");
       default:
-        @Nullable Type primitiveType = PRIMITIVE_TYPES.get(field.getType().getTypeName());
+        @Nullable
+        TableFieldSchema.Type primitiveType = PRIMITIVE_TYPES.get(field.getType().getTypeName());
         if (primitiveType == null) {
           throw new RuntimeException("Unsupported type " + field.getType());
         }
-        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(primitiveType);
+        builder = builder.setType(primitiveType);
     }
-    if (field.getType().getNullable()) {
-      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
-    } else {
-      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+    if (builder.getMode() != TableFieldSchema.Mode.REPEATED) {
+      if (field.getType().getNullable()) {
+        builder = builder.setMode(TableFieldSchema.Mode.NULLABLE);
+      } else {
+        builder = builder.setMode(TableFieldSchema.Mode.REQUIRED);
+      }
+    }
+    if (field.getDescription() != null) {
+      builder = builder.setDescription(field.getDescription());
     }
-    return fieldDescriptorBuilder;
+    return builder.build();
   }
 
   @Nullable
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e72bc20f780..fedc898fe97 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2668,11 +2668,9 @@ public class BigQueryIO {
     }
 
     /**
-     * If true, enables automatically detecting BigQuery table schema updates. If a message with
-     * unknown fields is processed, the BigQuery table is tabled to see if the schema has been
-     * updated. This is intended for scenarios in which unknown fields are rare, otherwise calls to
-     * BigQuery will throttle the pipeline. only supported when using one of the STORAGE_API insert
-     * methods.
+     * If true, enables automatically detecting BigQuery table schema updates. Table schema updates
+     * are usually noticed within several minutes. Only supported when using one of the STORAGE_API
+     * insert methods.
      */
     public Write<T> withAutoSchemaUpdate(boolean autoSchemaUpdate) {
       return toBuilder().setAutoSchemaUpdate(autoSchemaUpdate).build();
@@ -3174,9 +3172,7 @@ public class BigQueryIO {
                   dynamicDestinations,
                   tableRowWriterFactory.getToRowFn(),
                   getCreateDisposition(),
-                  getIgnoreUnknownValues(),
-                  bqOptions.getSchemaUpdateRetries(),
-                  getAutoSchemaUpdate());
+                  getIgnoreUnknownValues());
         }
 
         StorageApiLoads<DestinationT, T> storageApiLoads =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 53cb2713641..f0b3e061597 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -133,12 +133,6 @@ public interface BigQueryOptions
 
   void setBigQueryProject(String value);
 
-  @Description("Specify the number of schema update retries. For internal testing only.")
-  @Default.Integer(2)
-  Integer getSchemaUpdateRetries();
-
-  void setSchemaUpdateRetries(Integer value);
-
   @Description("Maximum (best effort) size of a single append to the storage API.")
   @Default.Integer(2 * 1024 * 1024)
   Integer getStorageApiAppendThresholdBytes();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 6a340496122..fbea947d056 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -576,7 +576,7 @@ public class BigQueryUtils {
       case DOUBLE:
         // The above types have native representations in JSON for all their
         // possible values.
-        return fieldValue;
+        return fieldValue.toString();
 
       case STRING:
       case INT64:
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
index 1166647f623..03b009797b5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java
@@ -19,12 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.cloud.bigquery.storage.v1.ProtoRows;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.DynamicMessage;
-import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.function.Function;
-import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.DescriptorWrapper;
 
 /**
  * Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize
@@ -34,18 +30,10 @@ import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.Descrip
 class SplittingIterable implements Iterable<ProtoRows> {
   private final Iterable<StorageApiWritePayload> underlying;
   private final long splitSize;
-  private final Function<Long, DescriptorWrapper> updateSchema;
-  private DescriptorWrapper currentDescriptor;
 
-  public SplittingIterable(
-      Iterable<StorageApiWritePayload> underlying,
-      long splitSize,
-      DescriptorWrapper currentDescriptor,
-      Function<Long, DescriptorWrapper> updateSchema) {
+  public SplittingIterable(Iterable<StorageApiWritePayload> underlying, long splitSize) {
     this.underlying = underlying;
     this.splitSize = splitSize;
-    this.updateSchema = updateSchema;
-    this.currentDescriptor = currentDescriptor;
   }
 
   @Override
@@ -68,23 +56,8 @@ class SplittingIterable implements Iterable<ProtoRows> {
         long bytesSize = 0;
         while (underlyingIterator.hasNext()) {
           StorageApiWritePayload payload = underlyingIterator.next();
-          if (payload.getSchemaHash() != currentDescriptor.hash) {
-            // Schema doesn't match. Try and get an updated schema hash (from the base table).
-            currentDescriptor = updateSchema.apply(payload.getSchemaHash());
-            // Validate that the record can now be parsed.
-            try {
-              DynamicMessage msg =
-                  DynamicMessage.parseFrom(currentDescriptor.descriptor, payload.getPayload());
-              if (msg.getUnknownFields() != null && !msg.getUnknownFields().asMap().isEmpty()) {
-                throw new RuntimeException(
-                    "Record schema does not match table. Unknown fields: "
-                        + msg.getUnknownFields());
-              }
-            } catch (InvalidProtocolBufferException e) {
-              throw new RuntimeException(e);
-            }
-          }
           ByteString byteString = ByteString.copyFrom(payload.getPayload());
+
           inserts.addSerializedRows(byteString);
           bytesSize += byteString.size();
           if (bytesSize > splitSize) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
index 53d640a00f2..c3076e8af86 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
-import com.google.protobuf.Descriptors.Descriptor;
 import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -31,26 +30,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 /** Base dynamicDestinations class used by the Storage API sink. */
 abstract class StorageApiDynamicDestinations<T, DestinationT>
     extends DynamicDestinations<T, DestinationT> {
-  /** Container object that contains a proto descriptor along with its deterministic hash. */
-  public static class DescriptorWrapper {
-    public final Descriptor descriptor;
-    public final long hash;
-
-    public DescriptorWrapper(Descriptor descriptor, long hash) {
-      this.descriptor = descriptor;
-      this.hash = hash;
-    }
-
-    @Override
-    public String toString() {
-      return "Descriptor: " + descriptor.getFullName() + " hash: " + hash;
-    }
-  }
-
   public interface MessageConverter<T> {
-    DescriptorWrapper getSchemaDescriptor();
-
-    void refreshSchema(long expectedHash) throws Exception;
+    com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema();
 
     StorageApiWritePayload toMessage(T element) throws Exception;
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
index 5f85cc1eb1b..4280d356bd2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsBeamRow.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Message;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -29,7 +30,7 @@ import org.checkerframework.checker.nullness.qual.NonNull;
 /** Storage API DynamicDestinations used when the input is a Beam Row. */
 class StorageApiDynamicDestinationsBeamRow<T, DestinationT extends @NonNull Object>
     extends StorageApiDynamicDestinations<T, DestinationT> {
-  private final Schema schema;
+  private final TableSchema tableSchema;
   private final SerializableFunction<T, Row> toRow;
 
   StorageApiDynamicDestinationsBeamRow(
@@ -37,40 +38,37 @@ class StorageApiDynamicDestinationsBeamRow<T, DestinationT extends @NonNull Obje
       Schema schema,
       SerializableFunction<T, Row> toRow) {
     super(inner);
-    this.schema = schema;
+    this.tableSchema = BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(schema);
     this.toRow = toRow;
   }
 
   @Override
   public MessageConverter<T> getMessageConverter(
       DestinationT destination, DatasetService datasetService) throws Exception {
-    return new MessageConverter<T>() {
-      final Descriptor descriptor;
-      final long descriptorHash;
+    return new BeamRowConverter();
+  }
 
-      {
-        descriptor = BeamRowToStorageApiProto.getDescriptorFromSchema(schema);
-        descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
-      }
+  class BeamRowConverter implements MessageConverter<T> {
+    final Descriptor descriptor;
 
-      @Override
-      public DescriptorWrapper getSchemaDescriptor() {
-        return new DescriptorWrapper(descriptor, descriptorHash);
-      }
+    BeamRowConverter() throws Exception {
+      this.descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true);
+    }
 
-      @Override
-      public void refreshSchema(long expectedHash) {}
+    @Override
+    public TableSchema getTableSchema() {
+      return tableSchema;
+    }
 
-      @Override
-      public StorageApiWritePayload toMessage(T element) {
-        Message msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, toRow.apply(element));
-        return new AutoValue_StorageApiWritePayload(msg.toByteArray(), descriptorHash);
-      }
+    @Override
+    public StorageApiWritePayload toMessage(T element) {
+      Message msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, toRow.apply(element));
+      return new AutoValue_StorageApiWritePayload(msg.toByteArray());
+    }
 
-      @Override
-      public TableRow toTableRow(T element) {
-        return BigQueryUtils.toTableRow(toRow.apply(element));
-      }
-    };
-  }
+    @Override
+    public TableRow toTableRow(T element) {
+      return BigQueryUtils.toTableRow(toRow.apply(element));
+    }
+  };
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index b025d01f02b..6797bd20e68 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -25,26 +25,20 @@ import com.google.protobuf.Message;
 import java.util.concurrent.ExecutionException;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaTooNarrowException;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonNull Object>
     extends StorageApiDynamicDestinations<T, DestinationT> {
   private final SerializableFunction<T, TableRow> formatFunction;
   private final CreateDisposition createDisposition;
   private final boolean ignoreUnknownValues;
-  private final int schemaUpdateRetries;
-  private final boolean autoSchemaUpdates;
   private static final TableSchemaCache SCHEMA_CACHE =
       new TableSchemaCache(Duration.standardSeconds(1));
-  private static final Logger LOG =
-      LoggerFactory.getLogger(StorageApiDynamicDestinationsTableRow.class);
 
   static {
     SCHEMA_CACHE.start();
@@ -54,15 +48,11 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
       DynamicDestinations<T, DestinationT> inner,
       SerializableFunction<T, TableRow> formatFunction,
       CreateDisposition createDisposition,
-      boolean ignoreUnknownValues,
-      int schemaUpdateRetries,
-      boolean autoSchemaUpdates) {
+      boolean ignoreUnknownValues) {
     super(inner);
     this.formatFunction = formatFunction;
     this.createDisposition = createDisposition;
     this.ignoreUnknownValues = ignoreUnknownValues;
-    this.schemaUpdateRetries = schemaUpdateRetries;
-    this.autoSchemaUpdates = autoSchemaUpdates;
   }
 
   static void clearSchemaCache() throws ExecutionException, InterruptedException {
@@ -72,128 +62,85 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
   @Override
   public MessageConverter<T> getMessageConverter(
       DestinationT destination, DatasetService datasetService) throws Exception {
-    return new MessageConverter<T>() {
-      @Nullable TableSchema tableSchema;
-      TableRowToStorageApiProto.SchemaInformation schemaInformation;
-      Descriptor descriptor;
-      long descriptorHash;
+    return new TableRowConverter(destination, datasetService);
+  }
 
-      {
-        tableSchema = getSchema(destination);
-        TableReference tableReference = getTable(destination).getTableReference();
-        if (tableSchema == null) {
-          // If the table already exists, then try and fetch the schema from the existing
-          // table.
-          tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
-          if (tableSchema == null) {
-            if (createDisposition == CreateDisposition.CREATE_NEVER) {
-              throw new RuntimeException(
-                  "BigQuery table "
-                      + tableReference
-                      + " not found. If you wanted to "
-                      + "automatically create the table, set the create disposition to CREATE_IF_NEEDED and specify a "
-                      + "schema.");
-            } else {
-              throw new RuntimeException(
-                  "Schema must be set for table "
-                      + tableReference
-                      + " when writing TableRows using Storage API and "
-                      + "using a create disposition of CREATE_IF_NEEDED.");
-            }
-          }
-        } else {
-          // Make sure we register this schema with the cache, unless there's already a more
-          // up-to-date schema.
-          tableSchema =
-              MoreObjects.firstNonNull(
-                  SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), tableSchema);
-        }
-        schemaInformation =
-            TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
-        descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
-        descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
-      }
+  class TableRowConverter implements MessageConverter<T> {
+    final @Nullable TableSchema tableSchema;
+    final com.google.cloud.bigquery.storage.v1.TableSchema protoTableSchema;
+    final TableRowToStorageApiProto.SchemaInformation schemaInformation;
+    final Descriptor descriptor;
 
-      @Override
-      public DescriptorWrapper getSchemaDescriptor() {
-        synchronized (this) {
-          return new DescriptorWrapper(descriptor, descriptorHash);
-        }
-      }
+    TableRowConverter(
+        TableSchema tableSchema,
+        TableRowToStorageApiProto.SchemaInformation schemaInformation,
+        Descriptor descriptor) {
+      this.tableSchema = tableSchema;
+      this.protoTableSchema = TableRowToStorageApiProto.schemaToProtoTableSchema(tableSchema);
+      this.schemaInformation = schemaInformation;
+      this.descriptor = descriptor;
+    }
 
-      @Override
-      public void refreshSchema(long expectedHash) throws Exception {
-        // When a table is updated, all streams writing to that table will try to refresh the
-        // schema. Since we don't want them all querying the table for the schema, keep track of
-        // the expected hash and return if it already matches.
-        synchronized (this) {
-          if (expectedHash == descriptorHash) {
-            return;
+    TableRowConverter(DestinationT destination, DatasetService datasetService) throws Exception {
+      TableSchema localTableSchema = getSchema(destination);
+      TableReference tableReference = getTable(destination).getTableReference();
+      if (localTableSchema == null) {
+        // If the table already exists, then try and fetch the schema from the existing
+        // table.
+        localTableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
+        if (localTableSchema == null) {
+          if (createDisposition == CreateDisposition.CREATE_NEVER) {
+            throw new RuntimeException(
+                "BigQuery table "
+                    + tableReference
+                    + " not found. If you wanted to "
+                    + "automatically create the table, set the create disposition to CREATE_IF_NEEDED and specify a "
+                    + "schema.");
+          } else {
+            throw new RuntimeException(
+                "Schema must be set for table "
+                    + tableReference
+                    + " when writing TableRows using Storage API and "
+                    + "using a create disposition of CREATE_IF_NEEDED.");
           }
         }
-        refreshSchemaInternal();
+      } else {
+        // Make sure we register this schema with the cache, unless there's already a more
+        // up-to-date schema.
+        localTableSchema =
+            MoreObjects.firstNonNull(
+                SCHEMA_CACHE.putSchemaIfAbsent(tableReference, localTableSchema), localTableSchema);
       }
+      this.tableSchema = localTableSchema;
+      this.protoTableSchema = TableRowToStorageApiProto.schemaToProtoTableSchema(tableSchema);
+      schemaInformation =
+          TableRowToStorageApiProto.SchemaInformation.fromTableSchema(protoTableSchema);
+      descriptor =
+          TableRowToStorageApiProto.getDescriptorFromTableSchema(
+              Preconditions.checkStateNotNull(tableSchema), true);
+    }
 
-      public void refreshSchemaInternal() throws Exception {
-        TableReference tableReference = getTable(destination).getTableReference();
-        SCHEMA_CACHE.refreshSchema(tableReference, datasetService);
-        TableSchema newSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
-        if (newSchema == null) {
-          throw new RuntimeException("BigQuery table " + tableReference + " not found");
-        }
-        synchronized (this) {
-          tableSchema = newSchema;
-          schemaInformation =
-              TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
-          descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
-          long newHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
-          if (descriptorHash != newHash) {
-            LOG.info(
-                "Refreshed table "
-                    + BigQueryHelpers.toTableSpec(tableReference)
-                    + " has a new schema.");
-          }
-          descriptorHash = newHash;
-        }
-      }
+    @Override
+    public com.google.cloud.bigquery.storage.v1.TableSchema getTableSchema() {
+      return protoTableSchema;
+    }
 
-      @Override
-      public TableRow toTableRow(T element) {
-        return formatFunction.apply(element);
-      }
+    @Override
+    public TableRow toTableRow(T element) {
+      return formatFunction.apply(element);
+    }
 
-      @Override
-      public StorageApiWritePayload toMessage(T element) throws Exception {
-        int attempt = 0;
-        do {
-          TableRowToStorageApiProto.SchemaInformation localSchemaInformation;
-          Descriptor localDescriptor;
-          long localDescriptorHash;
-          synchronized (this) {
-            localSchemaInformation = schemaInformation;
-            localDescriptor = descriptor;
-            localDescriptorHash = descriptorHash;
-          }
-          try {
-            Message msg =
-                TableRowToStorageApiProto.messageFromTableRow(
-                    localSchemaInformation,
-                    localDescriptor,
-                    formatFunction.apply(element),
-                    ignoreUnknownValues);
-            return new AutoValue_StorageApiWritePayload(msg.toByteArray(), localDescriptorHash);
-          } catch (SchemaTooNarrowException e) {
-            if (!autoSchemaUpdates || attempt > schemaUpdateRetries) {
-              throw e;
-            }
-            // The input record has fields not found in the schema, and ignoreUnknownValues=false.
-            // It's possible that the user has updated the target table with a wider schema. Try
-            // to read the target's table schema to see if that is the case.
-            refreshSchemaInternal();
-            ++attempt;
-          }
-        } while (true);
-      }
-    };
-  }
+    @Override
+    public StorageApiWritePayload toMessage(T element) throws Exception {
+      return toMessage(formatFunction.apply(element), true);
+    }
+
+    public StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired)
+        throws Exception {
+      Message msg =
+          TableRowToStorageApiProto.messageFromTableRow(
+              schemaInformation, descriptor, tableRow, ignoreUnknownValues);
+      return StorageApiWritePayload.of(msg.toByteArray());
+    }
+  };
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
index 20ab251c9c0..da2f695f708 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java
@@ -41,7 +41,6 @@ import org.joda.time.Duration;
 /** This {@link PTransform} manages loads into BigQuery using the Storage API. */
 public class StorageApiLoads<DestinationT, ElementT>
     extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
-  static final int MAX_BATCH_SIZE_BYTES = 2 * 1024 * 1024;
   final TupleTag<KV<DestinationT, StorageApiWritePayload>> successfulRowsTag =
       new TupleTag<>("successfulRows");
   final TupleTag<BigQueryStorageApiInsertError> failedRowsTag = new TupleTag<>("failedRows");
@@ -162,6 +161,12 @@ public class StorageApiLoads<DestinationT, ElementT>
 
     PCollection<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>> groupedRecords;
 
+    int maxAppendBytes =
+        input
+            .getPipeline()
+            .getOptions()
+            .as(BigQueryOptions.class)
+            .getStorageApiAppendThresholdBytes();
     if (this.allowAutosharding) {
       groupedRecords =
           convertMessagesResult
@@ -169,7 +174,7 @@ public class StorageApiLoads<DestinationT, ElementT>
               .apply(
                   "GroupIntoBatches",
                   GroupIntoBatches.<DestinationT, StorageApiWritePayload>ofByteSize(
-                          MAX_BATCH_SIZE_BYTES,
+                          maxAppendBytes,
                           (StorageApiWritePayload e) -> (long) e.getPayload().length)
                       .withMaxBufferingDuration(triggeringFrequency)
                       .withShardedKey());
@@ -182,8 +187,7 @@ public class StorageApiLoads<DestinationT, ElementT>
           shardedRecords.apply(
               "GroupIntoBatches",
               GroupIntoBatches.<ShardedKey<DestinationT>, StorageApiWritePayload>ofByteSize(
-                      MAX_BATCH_SIZE_BYTES,
-                      (StorageApiWritePayload e) -> (long) e.getPayload().length)
+                      maxAppendBytes, (StorageApiWritePayload e) -> (long) e.getPayload().length)
                   .withMaxBufferingDuration(triggeringFrequency));
     }
     PCollectionTuple writeRecordsResult =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java
index 4b620a5c6d4..00a34b9c14f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritePayload.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.auto.value.AutoValue;
+import java.io.IOException;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 
@@ -28,5 +29,7 @@ public abstract class StorageApiWritePayload {
   @SuppressWarnings("mutable")
   public abstract byte[] getPayload();
 
-  public abstract long getSchemaHash();
+  static StorageApiWritePayload of(byte[] payload) throws IOException {
+    return new AutoValue_StorageApiWritePayload(payload);
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 0f86b8871f0..99317a3fb23 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -25,6 +25,7 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
 import com.google.cloud.bigquery.storage.v1.Exceptions;
 import com.google.cloud.bigquery.storage.v1.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.DynamicMessage;
@@ -46,7 +47,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StreamAppendClient;
 import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
-import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.DescriptorWrapper;
 import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
@@ -99,18 +99,17 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
    * StreamAppendClient after looking up the cache, and we must ensure that the cache is not
    * accessed in between the lookup and the pin (any access of the cache could trigger element
    * expiration). Therefore most used of APPEND_CLIENTS should synchronize.
-   *
-   * <p>TODO(reuvenlax); Once all uses of StreamWriter are using
    */
-  private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
+  private static final Cache<String, AppendClientInfo> APPEND_CLIENTS =
       CacheBuilder.newBuilder()
           .expireAfterAccess(15, TimeUnit.MINUTES)
           .removalListener(
-              (RemovalNotification<String, StreamAppendClient> removal) -> {
+              (RemovalNotification<String, AppendClientInfo> removal) -> {
                 LOG.info("Expiring append client for " + removal.getKey());
-                final @Nullable StreamAppendClient streamAppendClient = removal.getValue();
-                // Close the writer in a different thread so as not to block the main one.
-                runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close);
+                final @Nullable AppendClientInfo appendClientInfo = removal.getValue();
+                if (appendClientInfo != null) {
+                  appendClientInfo.close();
+                }
               })
           .build();
 
@@ -199,9 +198,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
 
     class DestinationState {
       private final String tableUrn;
-      private final MessageConverter<ElementT> messageConverter;
       private String streamName = "";
-      private @Nullable StreamAppendClient streamAppendClient = null;
+      private @Nullable AppendClientInfo appendClientInfo = null;
       private long currentOffset = 0;
       private List<ByteString> pendingMessages;
       private transient @Nullable DatasetService maybeDatasetService;
@@ -209,8 +207,6 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
           Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
       private final Counter appendFailures =
           Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
-      private final Counter schemaMismatches =
-          Metrics.counter(WriteRecordsDoFn.class, "schemaMismatches");
       private final Distribution inflightWaitSecondsDistribution =
           Metrics.distribution(WriteRecordsDoFn.class, "streamWriterWaitSeconds");
       private final Counter rowsSentToFailedRowsCollection =
@@ -219,7 +215,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
               "rowsSentToFailedRowsCollection");
 
       private final boolean useDefaultStream;
-      private DescriptorWrapper descriptorWrapper;
+      private TableSchema tableSchema;
       private Instant nextCacheTickle = Instant.MAX;
       private final int clientNumber;
       private final boolean usingMultiplexing;
@@ -232,13 +228,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
           boolean useDefaultStream,
           int streamAppendClientCount,
           boolean usingMultiplexing,
-          long maxRequestSize) {
+          long maxRequestSize)
+          throws Exception {
         this.tableUrn = tableUrn;
-        this.messageConverter = messageConverter;
         this.pendingMessages = Lists.newArrayList();
         this.maybeDatasetService = datasetService;
         this.useDefaultStream = useDefaultStream;
-        this.descriptorWrapper = messageConverter.getSchemaDescriptor();
+        this.tableSchema = messageConverter.getTableSchema();
         this.clientNumber = new Random().nextInt(streamAppendClientCount);
         this.usingMultiplexing = usingMultiplexing;
         this.maxRequestSize = maxRequestSize;
@@ -246,9 +242,11 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
 
       void teardown() {
         maybeTickleCache();
-        if (streamAppendClient != null) {
-          runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::unpin);
-          streamAppendClient = null;
+        if (appendClientInfo != null) {
+          if (appendClientInfo.streamAppendClient != null) {
+            runAsyncIgnoreFailure(closeWriterExecutor, appendClientInfo.streamAppendClient::unpin);
+          }
+          appendClientInfo = null;
         }
       }
 
@@ -279,41 +277,52 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
         return this.streamName;
       }
 
-      StreamAppendClient generateClient() throws Exception {
+      AppendClientInfo generateClient(boolean createAppendClient) throws Exception {
         Preconditions.checkStateNotNull(maybeDatasetService);
-        return maybeDatasetService.getStreamAppendClient(
-            streamName, descriptorWrapper.descriptor, usingMultiplexing);
+        AppendClientInfo appendClientInfo =
+            new AppendClientInfo(
+                tableSchema,
+                // Make sure that the client is always closed in a different thread to avoid
+                // blocking.
+                client -> runAsyncIgnoreFailure(closeWriterExecutor, client::close));
+        if (createAppendClient) {
+          appendClientInfo =
+              appendClientInfo.createAppendClient(
+                  maybeDatasetService, () -> streamName, usingMultiplexing);
+          Preconditions.checkStateNotNull(appendClientInfo.streamAppendClient).pin();
+        }
+        return appendClientInfo;
       }
 
-      StreamAppendClient getStreamAppendClient(boolean lookupCache) {
+      AppendClientInfo getAppendClientInfo(boolean lookupCache, boolean createAppendClient) {
         try {
-          if (this.streamAppendClient == null) {
+          if (this.appendClientInfo == null) {
             getOrCreateStreamName();
-            final StreamAppendClient newStreamAppendClient;
+            final AppendClientInfo newAppendClientInfo;
             synchronized (APPEND_CLIENTS) {
               if (lookupCache) {
-                newStreamAppendClient =
+                newAppendClientInfo =
                     APPEND_CLIENTS.get(
-                        getStreamAppendClientCacheEntryKey(), () -> generateClient());
+                        getStreamAppendClientCacheEntryKey(),
+                        () -> generateClient(createAppendClient));
               } else {
-                newStreamAppendClient = generateClient();
-                // override the clients in the cache
-                APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), newStreamAppendClient);
+                newAppendClientInfo = generateClient(createAppendClient);
+                // override the clients in the cache.
+                APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), newAppendClientInfo);
               }
-              newStreamAppendClient.pin();
             }
             this.currentOffset = 0;
             nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));
-            this.streamAppendClient = newStreamAppendClient;
+            this.appendClientInfo = newAppendClientInfo;
           }
-          return streamAppendClient;
+          return appendClientInfo;
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
       }
 
       void maybeTickleCache() {
-        if (streamAppendClient != null && Instant.now().isAfter(nextCacheTickle)) {
+        if (appendClientInfo != null && Instant.now().isAfter(nextCacheTickle)) {
           synchronized (APPEND_CLIENTS) {
             APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryKey());
           }
@@ -322,57 +331,36 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       }
 
       void invalidateWriteStream() {
-        if (streamAppendClient != null) {
+        if (appendClientInfo != null) {
           synchronized (APPEND_CLIENTS) {
             // Unpin in a different thread, as it may execute a blocking close.
-            runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::unpin);
+            if (appendClientInfo.streamAppendClient != null) {
+              runAsyncIgnoreFailure(
+                  closeWriterExecutor, appendClientInfo.streamAppendClient::unpin);
+            }
             // The default stream is cached across multiple different DoFns. If they all try and
-            // invalidate, then we can
-            // get races between threads invalidating and recreating streams. For this reason, we
-            // check to see that the
-            // cache still contains the object we created before invalidating (in case another
-            // thread has already invalidated
-            // and recreated the stream).
+            // invalidate, then we can get races between threads invalidating and recreating
+            // streams. For this reason,
+            // we check to see that the cache still contains the object we created before
+            // invalidating (in case another
+            // thread has already invalidated and recreated the stream).
             String cacheEntryKey = getStreamAppendClientCacheEntryKey();
             @Nullable
-            StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(cacheEntryKey);
+            AppendClientInfo cachedAppendClient = APPEND_CLIENTS.getIfPresent(cacheEntryKey);
             if (cachedAppendClient != null
                 && System.identityHashCode(cachedAppendClient)
-                    == System.identityHashCode(streamAppendClient)) {
+                    == System.identityHashCode(appendClientInfo)) {
               APPEND_CLIENTS.invalidate(cacheEntryKey);
             }
           }
-          streamAppendClient = null;
+          appendClientInfo = null;
         }
       }
 
       void addMessage(StorageApiWritePayload payload) throws Exception {
         maybeTickleCache();
-        if (payload.getSchemaHash() != descriptorWrapper.hash) {
-          schemaMismatches.inc();
-          // The descriptor on the payload doesn't match the descriptor we know about. This
-          // means that the table has been updated, but that this transform hasn't found out
-          // about that yet. Refresh the schema and force a new StreamAppendClient to be
-          // created.
-          messageConverter.refreshSchema(payload.getSchemaHash());
-          descriptorWrapper = messageConverter.getSchemaDescriptor();
-          invalidateWriteStream();
-          if (useDefaultStream) {
-            // Since the default stream client is shared across many bundles and threads, we can't
-            // simply look it up from the cache, as another thread may have recreated it with the
-            // old
-            // schema.
-            getStreamAppendClient(false);
-          }
-          // Validate that the record can now be parsed.
-          DynamicMessage msg =
-              DynamicMessage.parseFrom(descriptorWrapper.descriptor, payload.getPayload());
-          if (msg.getUnknownFields() != null && !msg.getUnknownFields().asMap().isEmpty()) {
-            throw new RuntimeException(
-                "Record schema does not match table. Unknown fields: " + msg.getUnknownFields());
-          }
-        }
-        pendingMessages.add(ByteString.copyFrom(payload.getPayload()));
+        ByteString payloadBytes = ByteString.copyFrom(payload.getPayload());
+        pendingMessages.add(payloadBytes);
       }
 
       long flush(
@@ -403,7 +391,8 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
           for (ByteString rowBytes : inserts.getSerializedRowsList()) {
             TableRow failedRow =
                 TableRowToStorageApiProto.tableRowFromMessage(
-                    DynamicMessage.parseFrom(descriptorWrapper.descriptor, rowBytes));
+                    DynamicMessage.parseFrom(
+                        getAppendClientInfo(true, false).descriptor, rowBytes));
             failedRowsReceiver.output(
                 new BigQueryStorageApiInsertError(
                     failedRow, "Row payload too large. Maximum size " + maxRequestSize));
@@ -426,7 +415,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                 return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
               }
               try {
-                StreamAppendClient writeStream = getStreamAppendClient(true);
+                StreamAppendClient writeStream =
+                    Preconditions.checkStateNotNull(
+                        getAppendClientInfo(true, true).streamAppendClient);
                 ApiFuture<AppendRowsResponse> response =
                     writeStream.appendRows(c.offset, c.protoRows);
                 inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
@@ -457,7 +448,9 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                   try {
                     TableRow failedRow =
                         TableRowToStorageApiProto.tableRowFromMessage(
-                            DynamicMessage.parseFrom(descriptorWrapper.descriptor, protoBytes));
+                            DynamicMessage.parseFrom(
+                                Preconditions.checkStateNotNull(appendClientInfo).descriptor,
+                                protoBytes));
                     new BigQueryStorageApiInsertError(
                         failedRow, error.getRowIndexToErrorMessage().get(failedIndex));
                     failedRowsReceiver.output(
@@ -622,17 +615,17 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       MessageConverter<ElementT> messageConverter;
       try {
         messageConverter = messageConverters.get(destination, dynamicDestinations, datasetService);
+        return new DestinationState(
+            tableDestination1.getTableUrn(),
+            messageConverter,
+            datasetService,
+            useDefaultStream,
+            streamAppendClientCount,
+            bigQueryOptions.getUseStorageApiConnectionPool(),
+            bigQueryOptions.getStorageWriteApiMaxRequestSize());
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-      return new DestinationState(
-          tableDestination1.getTableUrn(),
-          messageConverter,
-          datasetService,
-          useDefaultStream,
-          streamAppendClientCount,
-          bigQueryOptions.getUseStorageApiConnectionPool(),
-          bigQueryOptions.getStorageWriteApiMaxRequestSize());
     }
 
     @ProcessElement
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index af0ae5169bc..ee3bf69f503 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -26,6 +26,7 @@ import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
 import com.google.cloud.bigquery.storage.v1.Exceptions;
 import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
 import com.google.cloud.bigquery.storage.v1.ProtoRows;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.DynamicMessage;
@@ -40,10 +41,10 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -51,8 +52,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StreamAppendClient;
 import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
-import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.DescriptorWrapper;
-import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
 import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
@@ -124,14 +123,45 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
   private final TupleTag<KV<String, Operation>> flushTag = new TupleTag<>("flushTag");
   private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
 
-  private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
+  // Context passed into RetryManager for each call.
+  class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> {
+    final ShardedKey<DestinationT> key;
+    String streamName = "";
+    @Nullable StreamAppendClient client = null;
+    long offset = -1;
+    long numRows = 0;
+    long tryIteration = 0;
+    ProtoRows protoRows;
+
+    AppendRowsContext(ShardedKey<DestinationT> key, ProtoRows protoRows) {
+      this.key = key;
+      this.protoRows = protoRows;
+    }
+
+    @Override
+    public String toString() {
+      return "Context: key="
+          + key
+          + " streamName="
+          + streamName
+          + " offset="
+          + offset
+          + " numRows="
+          + numRows
+          + " tryIteration: "
+          + tryIteration;
+    }
+  };
+
+  private static final Cache<ShardedKey<?>, AppendClientInfo> APPEND_CLIENTS =
       CacheBuilder.newBuilder()
           .expireAfterAccess(5, TimeUnit.MINUTES)
           .removalListener(
-              (RemovalNotification<String, StreamAppendClient> removal) -> {
-                final @Nullable StreamAppendClient streamAppendClient = removal.getValue();
-                // Close the writer in a different thread so as not to block the main one.
-                runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close);
+              (RemovalNotification<ShardedKey<?>, AppendClientInfo> removal) -> {
+                final @Nullable AppendClientInfo appendClientInfo = removal.getValue();
+                if (appendClientInfo != null) {
+                  appendClientInfo.close();
+                }
               })
           .build();
 
@@ -175,12 +205,16 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
   @Override
   public PCollectionTuple expand(
       PCollection<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>> input) {
+    BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+    final long splitSize = bigQueryOptions.getStorageApiAppendThresholdBytes();
+    final long maxRequestSize = bigQueryOptions.getStorageWriteApiMaxRequestSize();
+
     String operationName = input.getName() + "/" + getName();
     // Append records to the Storage API streams.
     PCollectionTuple writeRecordsResult =
         input.apply(
             "Write Records",
-            ParDo.of(new WriteRecordsDoFn(operationName, streamIdleTime))
+            ParDo.of(new WriteRecordsDoFn(operationName, streamIdleTime, splitSize, maxRequestSize))
                 .withSideInputs(dynamicDestinations.getSideInputs())
                 .withOutputTags(flushTag, TupleTagList.of(failedRowsTag)));
 
@@ -257,10 +291,15 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
     private final TimerSpec idleTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
 
     private final Duration streamIdleTime;
+    private final long splitSize;
+    private final long maxRequestSize;
 
-    public WriteRecordsDoFn(String operationName, Duration streamIdleTime) {
+    public WriteRecordsDoFn(
+        String operationName, Duration streamIdleTime, long splitSize, long maxRequestSize) {
       this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
       this.streamIdleTime = streamIdleTime;
+      this.splitSize = splitSize;
+      this.maxRequestSize = maxRequestSize;
     }
 
     @StartBundle
@@ -269,27 +308,29 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
     }
 
     // Get the current stream for this key. If there is no current stream, create one and store the
-    // stream name in
-    // persistent state.
+    // stream name in persistent state.
     String getOrCreateStream(
         String tableId,
         ValueState<String> streamName,
         ValueState<Long> streamOffset,
         Timer streamIdleTimer,
-        DatasetService datasetService)
-        throws IOException, InterruptedException {
-      String stream = streamName.read();
-      if (Strings.isNullOrEmpty(stream)) {
-        // In a buffered stream, data is only visible up to the offset to which it was flushed.
-        stream = datasetService.createWriteStream(tableId, Type.BUFFERED).getName();
-        streamName.write(stream);
-        streamOffset.write(0L);
-        streamsCreated.inc();
-      }
-      // Reset the idle timer.
-      streamIdleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
+        DatasetService datasetService) {
+      try {
+        String stream = streamName.read();
+        if (Strings.isNullOrEmpty(stream)) {
+          // In a buffered stream, data is only visible up to the offset to which it was flushed.
+          stream = datasetService.createWriteStream(tableId, Type.BUFFERED).getName();
+          streamName.write(stream);
+          streamOffset.write(0L);
+          streamsCreated.inc();
+        }
+        // Reset the idle timer.
+        streamIdleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative();
 
-      return stream;
+        return stream;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
     }
 
     private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
@@ -340,68 +381,30 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
               });
       final String tableId = tableDestination.getTableUrn();
       final DatasetService datasetService = getDatasetService(pipelineOptions);
-      MessageConverter<ElementT> messageConverter =
-          messageConverters.get(element.getKey().getKey(), dynamicDestinations, datasetService);
-      AtomicReference<DescriptorWrapper> descriptor =
-          new AtomicReference<>(messageConverter.getSchemaDescriptor());
-
-      // Each ProtoRows object contains at most 1MB of rows.
-      // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if
-      // already proto or already schema.
-      final long splitSize = bigQueryOptions.getStorageApiAppendThresholdBytes();
-      // Called if the schema does not match.
-      Function<Long, DescriptorWrapper> updateSchemaHash =
-          (Long expectedHash) -> {
-            try {
-              LOG.info("Schema does not match. Querying BigQuery for the current table schema.");
-              // Update the schema from the table.
-              messageConverter.refreshSchema(expectedHash);
-              descriptor.set(messageConverter.getSchemaDescriptor());
-              // Force a new connection.
-              String stream = streamName.read();
-              if (stream != null) {
-                APPEND_CLIENTS.invalidate(stream);
-              }
-              return descriptor.get();
-            } catch (Exception e) {
-              throw new RuntimeException(e);
-            }
-          };
-      Iterable<ProtoRows> messages =
-          new SplittingIterable(element.getValue(), splitSize, descriptor.get(), updateSchemaHash);
-
-      class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> {
-        final ShardedKey<DestinationT> key;
-        String streamName = "";
-        @Nullable StreamAppendClient client = null;
-        long offset = -1;
-        long numRows = 0;
-        long tryIteration = 0;
-        ProtoRows protoRows;
-
-        AppendRowsContext(ShardedKey<DestinationT> key, ProtoRows protoRows) {
-          this.key = key;
-          this.protoRows = protoRows;
-        }
 
-        @Override
-        public String toString() {
-          return "Context: key="
-              + key
-              + " streamName="
-              + streamName
-              + " offset="
-              + offset
-              + " numRows="
-              + numRows
-              + " tryIteration: "
-              + tryIteration;
-        }
-      };
+      Supplier<String> getOrCreateStream =
+          () -> getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService);
+      final AppendClientInfo appendClientInfo =
+          APPEND_CLIENTS.get(
+              element.getKey(),
+              () -> {
+                @Nullable
+                TableSchema tableSchema =
+                    messageConverters
+                        .get(element.getKey().getKey(), dynamicDestinations, datasetService)
+                        .getTableSchema();
+                return new AppendClientInfo(
+                        tableSchema,
+                        // Make sure that the client is always closed in a different thread to avoid
+                        // blocking.
+                        client -> runAsyncIgnoreFailure(closeWriterExecutor, client::close))
+                    .createAppendClient(datasetService, getOrCreateStream, false);
+              });
+
+      Iterable<ProtoRows> messages = new SplittingIterable(element.getValue(), splitSize);
 
       // Initialize stream names and offsets for all contexts. This will be called initially, but
-      // will also be called
-      // if we roll over to a new stream on a retry.
+      // will also be called if we roll over to a new stream on a retry.
       BiConsumer<Iterable<AppendRowsContext>, Boolean> initializeContexts =
           (contexts, isFailure) -> {
             try {
@@ -409,18 +412,13 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
                 // Clear the stream name, forcing a new one to be created.
                 streamName.write("");
               }
-              String stream =
-                  getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService);
-              StreamAppendClient appendClient =
-                  APPEND_CLIENTS.get(
-                      stream,
-                      () ->
-                          datasetService.getStreamAppendClient(
-                              stream, descriptor.get().descriptor, false));
+              appendClientInfo.createAppendClient(datasetService, getOrCreateStream, false);
+              StreamAppendClient streamAppendClient =
+                  Preconditions.checkArgumentNotNull(appendClientInfo.streamAppendClient);
               for (AppendRowsContext context : contexts) {
-                context.streamName = stream;
-                appendClient.pin();
-                context.client = appendClient;
+                context.streamName = streamName.read();
+                streamAppendClient.pin();
+                context.client = appendClientInfo.streamAppendClient;
                 context.offset = streamOffset.read();
                 ++context.tryIteration;
                 streamOffset.write(context.offset + context.protoRows.getSerializedRowsCount());
@@ -432,7 +430,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
 
       Consumer<Iterable<AppendRowsContext>> clearClients =
           contexts -> {
-            APPEND_CLIENTS.invalidate(streamName.read());
+            appendClientInfo.clearAppendClient();
             for (AppendRowsContext context : contexts) {
               if (context.client != null) {
                 // Unpin in a different thread, as it may execute a blocking close.
@@ -450,13 +448,9 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
               return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build());
             }
             try {
-              StreamAppendClient appendClient =
-                  APPEND_CLIENTS.get(
-                      context.streamName,
-                      () ->
-                          datasetService.getStreamAppendClient(
-                              context.streamName, descriptor.get().descriptor, false));
-              return appendClient.appendRows(context.offset, context.protoRows);
+              appendClientInfo.createAppendClient(datasetService, getOrCreateStream, false);
+              return Preconditions.checkStateNotNull(appendClientInfo.streamAppendClient)
+                  .appendRows(context.offset, context.protoRows);
             } catch (Exception e) {
               throw new RuntimeException(e);
             }
@@ -485,7 +479,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
                 try {
                   TableRow failedRow =
                       TableRowToStorageApiProto.tableRowFromMessage(
-                          DynamicMessage.parseFrom(descriptor.get().descriptor, protoBytes));
+                          DynamicMessage.parseFrom(appendClientInfo.descriptor, protoBytes));
                   new BigQueryStorageApiInsertError(
                       failedRow, error.getRowIndexToErrorMessage().get(failedIndex));
                   o.get(failedRowsTag)
@@ -580,7 +574,6 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
                             false)));
             flushesScheduled.inc(context.protoRows.getSerializedRowsCount());
           };
-      long maxRequestSize = bigQueryOptions.getStorageWriteApiMaxRequestSize();
       Instant now = Instant.now();
       List<AppendRowsContext> contexts = Lists.newArrayList();
       RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
@@ -602,7 +595,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
           for (ByteString rowBytes : protoRows.getSerializedRowsList()) {
             TableRow failedRow =
                 TableRowToStorageApiProto.tableRowFromMessage(
-                    DynamicMessage.parseFrom(descriptor.get().descriptor, rowBytes));
+                    DynamicMessage.parseFrom(appendClientInfo.descriptor, rowBytes));
             o.get(failedRowsTag)
                 .output(
                     new BigQueryStorageApiInsertError(
@@ -643,6 +636,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
     private void finalizeStream(
         @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
         @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
+        ShardedKey<DestinationT> key,
         MultiOutputReceiver o,
         org.joda.time.Instant finalizeElementTs) {
       String stream = MoreObjects.firstNonNull(streamName.read(), "");
@@ -656,12 +650,13 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
         streamName.clear();
         streamOffset.clear();
         // Make sure that the stream object is closed.
-        APPEND_CLIENTS.invalidate(stream);
+        APPEND_CLIENTS.invalidate(key);
       }
     }
 
     @OnTimer("idleTimer")
     public void onTimer(
+        @Key ShardedKey<DestinationT> key,
         @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
         @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
         MultiOutputReceiver o,
@@ -672,19 +667,20 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
       // a pipeline) this finalize element will be dropped as late. This is usually ok as
       // BigQuery will eventually garbage collect the stream. We attempt to finalize idle streams
       // merely to remove the pressure of large numbers of orphaned streams from BigQuery.
-      finalizeStream(streamName, streamOffset, o, window.maxTimestamp());
+      finalizeStream(streamName, streamOffset, key, o, window.maxTimestamp());
       streamsIdle.inc();
     }
 
     @OnWindowExpiration
     public void onWindowExpiration(
+        @Key ShardedKey<DestinationT> key,
         @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
         @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
         MultiOutputReceiver o,
         BoundedWindow window) {
       // Window is done - usually because the pipeline has been drained. Make sure to clean up
       // streams so that they are not leaked.
-      finalizeStream(streamName, streamOffset, o, window.maxTimestamp());
+      finalizeStream(streamName, streamOffset, key, o, window.maxTimestamp());
     }
 
     @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index eaf90fea829..cb0b0eaa5e3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -19,10 +19,10 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static java.util.stream.Collectors.toList;
 
-import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder;
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.DescriptorProtos.DescriptorProto;
 import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
@@ -56,6 +56,7 @@ import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -107,30 +108,192 @@ public class TableRowToStorageApiProto {
     }
   }
 
+  ///////////////////////////////////
+  // Conversion between TableSchema the json class and TableSchema the proto class.
+
+  private static final Map<Mode, TableFieldSchema.Mode> MODE_MAP_JSON_PROTO =
+      ImmutableMap.of(
+          Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
+          Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED,
+          Mode.REPEATED, TableFieldSchema.Mode.REPEATED);
+  private static final Map<TableFieldSchema.Mode, String> MODE_MAP_PROTO_JSON =
+      ImmutableMap.of(
+          TableFieldSchema.Mode.NULLABLE, "NULLABLE",
+          TableFieldSchema.Mode.REQUIRED, "REQUIRED",
+          TableFieldSchema.Mode.REPEATED, "REPEATED");
+
+  private static final Map<String, TableFieldSchema.Type> TYPE_MAP_JSON_PROTO =
+      ImmutableMap.<String, TableFieldSchema.Type>builder()
+          .put("STRUCT", TableFieldSchema.Type.STRUCT)
+          .put("RECORD", TableFieldSchema.Type.STRUCT)
+          .put("INT64", TableFieldSchema.Type.INT64)
+          .put("INTEGER", TableFieldSchema.Type.INT64)
+          .put("FLOAT64", TableFieldSchema.Type.DOUBLE)
+          .put("FLOAT", TableFieldSchema.Type.DOUBLE)
+          .put("STRING", TableFieldSchema.Type.STRING)
+          .put("BOOL", TableFieldSchema.Type.BOOL)
+          .put("BOOLEAN", TableFieldSchema.Type.BOOL)
+          .put("BYTES", TableFieldSchema.Type.BYTES)
+          .put("NUMERIC", TableFieldSchema.Type.NUMERIC)
+          .put("BIGNUMERIC", TableFieldSchema.Type.BIGNUMERIC)
+          .put("GEOGRAPHY", TableFieldSchema.Type.GEOGRAPHY)
+          .put("DATE", TableFieldSchema.Type.DATE)
+          .put("TIME", TableFieldSchema.Type.TIME)
+          .put("DATETIME", TableFieldSchema.Type.DATETIME)
+          .put("TIMESTAMP", TableFieldSchema.Type.TIMESTAMP)
+          .put("JSON", TableFieldSchema.Type.JSON)
+          .build();
+  private static final Map<TableFieldSchema.Type, String> TYPE_MAP_PROTO_JSON =
+      ImmutableMap.<TableFieldSchema.Type, String>builder()
+          .put(TableFieldSchema.Type.STRUCT, "STRUCT")
+          .put(TableFieldSchema.Type.INT64, "INT64")
+          .put(TableFieldSchema.Type.DOUBLE, "FLOAT64")
+          .put(TableFieldSchema.Type.STRING, "STRING")
+          .put(TableFieldSchema.Type.BOOL, "BOOL")
+          .put(TableFieldSchema.Type.BYTES, "BYTES")
+          .put(TableFieldSchema.Type.NUMERIC, "NUMERIC")
+          .put(TableFieldSchema.Type.BIGNUMERIC, "BIGNUMERIC")
+          .put(TableFieldSchema.Type.GEOGRAPHY, "GEOGRAPHY")
+          .put(TableFieldSchema.Type.DATE, "DATE")
+          .put(TableFieldSchema.Type.TIME, "TIME")
+          .put(TableFieldSchema.Type.DATETIME, "DATETIME")
+          .put(TableFieldSchema.Type.TIMESTAMP, "TIMESTAMP")
+          .put(TableFieldSchema.Type.JSON, "JSON")
+          .build();
+
+  public static TableFieldSchema.Mode modeToProtoMode(String mode) {
+    return Optional.ofNullable(mode)
+        .map(Mode::valueOf)
+        .map(m -> MODE_MAP_JSON_PROTO.get(m))
+        .orElse(TableFieldSchema.Mode.REQUIRED);
+  }
+
+  public static String protoModeToJsonMode(TableFieldSchema.Mode protoMode) {
+    String jsonMode = MODE_MAP_PROTO_JSON.get(protoMode);
+    if (jsonMode == null) {
+      throw new RuntimeException("Unknown mode " + protoMode);
+    }
+    return jsonMode;
+  }
+
+  public static String protoTypeToJsonType(TableFieldSchema.Type protoType) {
+    String type = TYPE_MAP_PROTO_JSON.get(protoType);
+    if (type == null) {
+      throw new RuntimeException("Unknown type " + protoType);
+    }
+    return type;
+  }
+
+  public static TableFieldSchema.Type typeToProtoType(String type) {
+    TableFieldSchema.Type protoType = TYPE_MAP_JSON_PROTO.get(type);
+    if (protoType == null) {
+      throw new RuntimeException("Unknown type " + type);
+    }
+    return protoType;
+  }
+
+  public static com.google.api.services.bigquery.model.TableSchema protoSchemaToTableSchema(
+      TableSchema protoTableSchema) {
+    com.google.api.services.bigquery.model.TableSchema tableSchema =
+        new com.google.api.services.bigquery.model.TableSchema();
+    List<com.google.api.services.bigquery.model.TableFieldSchema> tableFields =
+        Lists.newArrayListWithExpectedSize(protoTableSchema.getFieldsCount());
+    for (TableFieldSchema protoTableField : protoTableSchema.getFieldsList()) {
+      tableFields.add(protoTableFieldToTableField(protoTableField));
+    }
+    return tableSchema.setFields(tableFields);
+  }
+
+  public static com.google.api.services.bigquery.model.TableFieldSchema protoTableFieldToTableField(
+      TableFieldSchema protoTableField) {
+    com.google.api.services.bigquery.model.TableFieldSchema tableField =
+        new com.google.api.services.bigquery.model.TableFieldSchema();
+    tableField = tableField.setName(protoTableField.getName());
+    if (!Strings.isNullOrEmpty(tableField.getDescription())) {
+      tableField = tableField.setDescription(protoTableField.getDescription());
+    }
+    if (protoTableField.getMaxLength() != 0) {
+      tableField = tableField.setMaxLength(protoTableField.getMaxLength());
+    }
+    if (protoTableField.getMode() != TableFieldSchema.Mode.MODE_UNSPECIFIED) {
+      tableField = tableField.setMode(protoModeToJsonMode(protoTableField.getMode()));
+    }
+    if (protoTableField.getPrecision() != 0) {
+      tableField = tableField.setPrecision(protoTableField.getPrecision());
+    }
+    if (protoTableField.getScale() != 0) {
+      tableField = tableField.setScale(protoTableField.getScale());
+    }
+    tableField = tableField.setType(protoTypeToJsonType(protoTableField.getType()));
+    if (protoTableField.getType().equals(TableFieldSchema.Type.STRUCT)) {
+      List<com.google.api.services.bigquery.model.TableFieldSchema> subFields =
+          Lists.newArrayListWithExpectedSize(protoTableField.getFieldsCount());
+      for (TableFieldSchema subField : protoTableField.getFieldsList()) {
+        subFields.add(protoTableFieldToTableField(subField));
+      }
+      tableField = tableField.setFields(subFields);
+    }
+    return tableField;
+  }
+
+  public static TableSchema schemaToProtoTableSchema(
+      com.google.api.services.bigquery.model.TableSchema tableSchema) {
+    TableSchema.Builder builder = TableSchema.newBuilder();
+    if (tableSchema.getFields() != null) {
+      for (com.google.api.services.bigquery.model.TableFieldSchema field :
+          tableSchema.getFields()) {
+        builder.addFields(tableFieldToProtoTableField(field));
+      }
+    }
+    return builder.build();
+  }
+
+  public static TableFieldSchema tableFieldToProtoTableField(
+      com.google.api.services.bigquery.model.TableFieldSchema field) {
+    TableFieldSchema.Builder builder = TableFieldSchema.newBuilder();
+    builder.setName(field.getName());
+    if (field.getDescription() != null) {
+      builder.setDescription(field.getDescription());
+    }
+    if (field.getMaxLength() != null) {
+      builder.setMaxLength(field.getMaxLength());
+    }
+    if (field.getMode() != null) {
+      builder.setMode(modeToProtoMode(field.getMode()));
+    }
+    if (field.getPrecision() != null) {
+      builder.setPrecision(field.getPrecision());
+    }
+    if (field.getScale() != null) {
+      builder.setScale(field.getScale());
+    }
+    builder.setType(typeToProtoType(field.getType()));
+    if (builder.getType().equals(TableFieldSchema.Type.STRUCT)) {
+      for (com.google.api.services.bigquery.model.TableFieldSchema subField : field.getFields()) {
+        builder.addFields(tableFieldToProtoTableField(subField));
+      }
+    }
+    return builder.build();
+  }
+
   static class SchemaInformation {
     private final TableFieldSchema tableFieldSchema;
     private final List<SchemaInformation> subFields;
     private final Map<String, SchemaInformation> subFieldsByName;
     private final Iterable<SchemaInformation> parentSchemas;
 
-    private SchemaInformation(TableFieldSchema tableFieldSchema) {
-      this(tableFieldSchema, Collections.emptyList());
-    }
-
     private SchemaInformation(
         TableFieldSchema tableFieldSchema, Iterable<SchemaInformation> parentSchemas) {
       this.tableFieldSchema = tableFieldSchema;
       this.subFields = Lists.newArrayList();
       this.subFieldsByName = Maps.newHashMap();
       this.parentSchemas = parentSchemas;
-      if (tableFieldSchema.getFields() != null) {
-        for (TableFieldSchema field : tableFieldSchema.getFields()) {
-          SchemaInformation schemaInformation =
-              new SchemaInformation(
-                  field, Iterables.concat(this.parentSchemas, ImmutableList.of(this)));
-          subFields.add(schemaInformation);
-          subFieldsByName.put(field.getName(), schemaInformation);
-        }
+      for (TableFieldSchema field : tableFieldSchema.getFieldsList()) {
+        SchemaInformation schemaInformation =
+            new SchemaInformation(
+                field, Iterables.concat(this.parentSchemas, ImmutableList.of(this)));
+        subFields.add(schemaInformation);
+        subFieldsByName.put(field.getName(), schemaInformation);
       }
     }
 
@@ -146,7 +309,7 @@ public class TableRowToStorageApiProto {
       return tableFieldSchema.getName();
     }
 
-    public String getType() {
+    public TableFieldSchema.Type getType() {
       return tableFieldSchema.getType();
     }
 
@@ -167,42 +330,50 @@ public class TableRowToStorageApiProto {
     }
 
     static SchemaInformation fromTableSchema(TableSchema tableSchema) {
-      TableFieldSchema rootSchema =
-          new TableFieldSchema()
-              .setName("__root__")
-              .setType("RECORD")
-              .setFields(tableSchema.getFields());
-      return new SchemaInformation(rootSchema);
+      TableFieldSchema root =
+          TableFieldSchema.newBuilder()
+              .addAllFields(tableSchema.getFieldsList())
+              .setName("root")
+              .build();
+      return new SchemaInformation(root, Collections.emptyList());
+    }
+
+    static SchemaInformation fromTableSchema(
+        com.google.api.services.bigquery.model.TableSchema jsonTableSchema) {
+      return SchemaInformation.fromTableSchema(schemaToProtoTableSchema(jsonTableSchema));
     }
   }
 
-  static final Map<String, Type> PRIMITIVE_TYPES =
-      ImmutableMap.<String, Type>builder()
-          .put("INT64", Type.TYPE_INT64)
-          .put("INTEGER", Type.TYPE_INT64)
-          .put("FLOAT64", Type.TYPE_DOUBLE)
-          .put("FLOAT", Type.TYPE_DOUBLE)
-          .put("STRING", Type.TYPE_STRING)
-          .put("BOOL", Type.TYPE_BOOL)
-          .put("BOOLEAN", Type.TYPE_BOOL)
-          .put("BYTES", Type.TYPE_BYTES)
-          .put("NUMERIC", Type.TYPE_BYTES)
-          .put("BIGNUMERIC", Type.TYPE_BYTES)
-          .put("GEOGRAPHY", Type.TYPE_STRING) // Pass through the JSON encoding.
-          .put("DATE", Type.TYPE_INT32)
-          .put("TIME", Type.TYPE_INT64)
-          .put("DATETIME", Type.TYPE_INT64)
-          .put("TIMESTAMP", Type.TYPE_INT64)
-          .put("JSON", Type.TYPE_STRING)
+  static final Map<TableFieldSchema.Type, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TableFieldSchema.Type, Type>builder()
+          .put(TableFieldSchema.Type.INT64, Type.TYPE_INT64)
+          .put(TableFieldSchema.Type.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TableFieldSchema.Type.STRING, Type.TYPE_STRING)
+          .put(TableFieldSchema.Type.BOOL, Type.TYPE_BOOL)
+          .put(TableFieldSchema.Type.BYTES, Type.TYPE_BYTES)
+          .put(TableFieldSchema.Type.NUMERIC, Type.TYPE_BYTES)
+          .put(TableFieldSchema.Type.BIGNUMERIC, Type.TYPE_BYTES)
+          .put(TableFieldSchema.Type.GEOGRAPHY, Type.TYPE_STRING) // Pass through the JSON encoding.
+          .put(TableFieldSchema.Type.DATE, Type.TYPE_INT32)
+          .put(TableFieldSchema.Type.TIME, Type.TYPE_INT64)
+          .put(TableFieldSchema.Type.DATETIME, Type.TYPE_INT64)
+          .put(TableFieldSchema.Type.TIMESTAMP, Type.TYPE_INT64)
+          .put(TableFieldSchema.Type.JSON, Type.TYPE_STRING)
           .build();
 
+  public static Descriptor getDescriptorFromTableSchema(
+      com.google.api.services.bigquery.model.TableSchema jsonSchema, boolean respectRequired)
+      throws DescriptorValidationException {
+    return getDescriptorFromTableSchema(schemaToProtoTableSchema(jsonSchema), respectRequired);
+  }
+
   /**
    * Given a BigQuery TableSchema, returns a protocol-buffer Descriptor that can be used to write
    * data using the BigQuery Storage API.
    */
-  public static Descriptor getDescriptorFromTableSchema(TableSchema jsonSchema)
-      throws DescriptorValidationException {
-    DescriptorProto descriptorProto = descriptorSchemaFromTableSchema(jsonSchema);
+  public static Descriptor getDescriptorFromTableSchema(
+      TableSchema tableSchema, boolean respectRequired) throws DescriptorValidationException {
+    DescriptorProto descriptorProto = descriptorSchemaFromTableSchema(tableSchema, respectRequired);
     FileDescriptorProto fileDescriptorProto =
         FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
     FileDescriptor fileDescriptor =
@@ -218,7 +389,7 @@ public class TableRowToStorageApiProto {
       boolean ignoreUnknownValues)
       throws SchemaConversionException {
     DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
-    for (Map.Entry<String, Object> entry : map.entrySet()) {
+    for (final Map.Entry<String, Object> entry : map.entrySet()) {
       @Nullable
       FieldDescriptor fieldDescriptor = descriptor.findFieldByName(entry.getKey().toLowerCase());
       if (fieldDescriptor == null) {
@@ -267,7 +438,7 @@ public class TableRowToStorageApiProto {
       SchemaInformation schemaInformation,
       Descriptor descriptor,
       TableRow tableRow,
-      boolean ignoreUnkownValues)
+      boolean ignoreUnknownValues)
       throws SchemaConversionException {
     @Nullable Object fValue = tableRow.get("f");
     if (fValue instanceof List) {
@@ -275,13 +446,15 @@ public class TableRowToStorageApiProto {
       DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
       int cellsToProcess = cells.size();
       if (cells.size() > descriptor.getFields().size()) {
-        if (ignoreUnkownValues) {
+        if (ignoreUnknownValues) {
           cellsToProcess = descriptor.getFields().size();
         } else {
           throw new SchemaTooNarrowException(
-              "TableRow contained too many fields and ignoreUnknownValues not set.");
+              "TableRow contained too many fields and ignoreUnknownValues not set in "
+                  + schemaInformation.getName());
         }
       }
+
       for (int i = 0; i < cellsToProcess; ++i) {
         AbstractMap<String, Object> cell = cells.get(i);
         FieldDescriptor fieldDescriptor = descriptor.getFields().get(i);
@@ -290,7 +463,7 @@ public class TableRowToStorageApiProto {
           @Nullable
           Object value =
               messageValueFromFieldValue(
-                  fieldSchemaInformation, fieldDescriptor, cell.get("v"), ignoreUnkownValues);
+                  fieldSchemaInformation, fieldDescriptor, cell.get("v"), ignoreUnknownValues);
           if (value != null) {
             builder.setField(fieldDescriptor, value);
           }
@@ -311,36 +484,46 @@ public class TableRowToStorageApiProto {
             "Could convert schema for " + schemaInformation.getFullName(), e);
       }
     } else {
-      return messageFromMap(schemaInformation, descriptor, tableRow, ignoreUnkownValues);
+      return messageFromMap(schemaInformation, descriptor, tableRow, ignoreUnknownValues);
     }
   }
 
   @VisibleForTesting
-  static DescriptorProto descriptorSchemaFromTableSchema(TableSchema tableSchema) {
-    return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
+  static DescriptorProto descriptorSchemaFromTableSchema(
+      com.google.api.services.bigquery.model.TableSchema tableSchema, boolean respectRequired) {
+    return descriptorSchemaFromTableSchema(schemaToProtoTableSchema(tableSchema), respectRequired);
+  }
+
+  @VisibleForTesting
+  static DescriptorProto descriptorSchemaFromTableSchema(
+      TableSchema tableSchema, boolean respectRequired) {
+    return descriptorSchemaFromTableFieldSchemas(tableSchema.getFieldsList(), respectRequired);
   }
 
   private static DescriptorProto descriptorSchemaFromTableFieldSchemas(
-      Iterable<TableFieldSchema> tableFieldSchemas) {
+      Iterable<TableFieldSchema> tableFieldSchemas, boolean respectRequired) {
     DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
     // Create a unique name for the descriptor ('-' characters cannot be used).
     descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
     int i = 1;
     for (TableFieldSchema fieldSchema : tableFieldSchemas) {
-      fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder);
+      fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder, respectRequired);
     }
     return descriptorBuilder.build();
   }
 
   private static void fieldDescriptorFromTableField(
-      TableFieldSchema fieldSchema, int fieldNumber, DescriptorProto.Builder descriptorBuilder) {
+      TableFieldSchema fieldSchema,
+      int fieldNumber,
+      DescriptorProto.Builder descriptorBuilder,
+      boolean respectRequired) {
     FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
     fieldDescriptorBuilder = fieldDescriptorBuilder.setName(fieldSchema.getName().toLowerCase());
     fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
     switch (fieldSchema.getType()) {
-      case "STRUCT":
-      case "RECORD":
-        DescriptorProto nested = descriptorSchemaFromTableFieldSchemas(fieldSchema.getFields());
+      case STRUCT:
+        DescriptorProto nested =
+            descriptorSchemaFromTableFieldSchemas(fieldSchema.getFieldsList(), respectRequired);
         descriptorBuilder.addNestedType(nested);
         fieldDescriptorBuilder =
             fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
@@ -354,12 +537,11 @@ public class TableRowToStorageApiProto {
         fieldDescriptorBuilder = fieldDescriptorBuilder.setType(type);
     }
 
-    Optional<Mode> fieldMode = Optional.ofNullable(fieldSchema.getMode()).map(Mode::valueOf);
-    if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
+    if (fieldSchema.getMode() == TableFieldSchema.Mode.REPEATED) {
       fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
-    } else if (!fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent()) {
+    } else if (!respectRequired || fieldSchema.getMode() == TableFieldSchema.Mode.NULLABLE) {
       fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
-    } else {
+    } else if (fieldSchema.getMode() == TableFieldSchema.Mode.REQUIRED) {
       fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
     }
     descriptorBuilder.addField(fieldDescriptorBuilder.build());
@@ -377,6 +559,7 @@ public class TableRowToStorageApiProto {
       } else if (fieldDescriptor.isRepeated()) {
         return Collections.emptyList();
       } else {
+        // TODO: Allow expanding this!
         throw new SchemaDoesntMatchException(
             "Received null value for non-nullable field " + schemaInformation.getFullName());
       }
@@ -405,31 +588,28 @@ public class TableRowToStorageApiProto {
       boolean ignoreUnknownValues)
       throws SchemaConversionException {
     switch (schemaInformation.getType()) {
-      case "INT64":
-      case "INTEGER":
+      case INT64:
         if (value instanceof String) {
           return Long.valueOf((String) value);
         } else if (value instanceof Integer || value instanceof Long) {
           return ((Number) value).longValue();
         }
         break;
-      case "FLOAT64":
-      case "FLOAT":
+      case DOUBLE:
         if (value instanceof String) {
           return Double.valueOf((String) value);
         } else if (value instanceof Number) {
           return ((Number) value).doubleValue();
         }
         break;
-      case "BOOLEAN":
-      case "BOOL":
+      case BOOL:
         if (value instanceof String) {
           return Boolean.valueOf((String) value);
         } else if (value instanceof Boolean) {
           return value;
         }
         break;
-      case "BYTES":
+      case BYTES:
         if (value instanceof String) {
           return ByteString.copyFrom(BaseEncoding.base64().decode((String) value));
         } else if (value instanceof byte[]) {
@@ -438,7 +618,7 @@ public class TableRowToStorageApiProto {
           return value;
         }
         break;
-      case "TIMESTAMP":
+      case TIMESTAMP:
         if (value instanceof String) {
           try {
             // '2011-12-03T10:15:30+01:00' '2011-12-03T10:15:30'
@@ -470,7 +650,7 @@ public class TableRowToStorageApiProto {
               .longValue();
         }
         break;
-      case "DATE":
+      case DATE:
         if (value instanceof String) {
           return ((Long) LocalDate.parse((String) value).toEpochDay()).intValue();
         } else if (value instanceof LocalDate) {
@@ -484,7 +664,7 @@ public class TableRowToStorageApiProto {
           return ((Number) value).intValue();
         }
         break;
-      case "NUMERIC":
+      case NUMERIC:
         if (value instanceof String) {
           return BigDecimalByteStringEncoder.encodeToNumericByteString(
               new BigDecimal((String) value));
@@ -495,7 +675,7 @@ public class TableRowToStorageApiProto {
               BigDecimal.valueOf(((Number) value).doubleValue()));
         }
         break;
-      case "BIGNUMERIC":
+      case BIGNUMERIC:
         if (value instanceof String) {
           return BigDecimalByteStringEncoder.encodeToBigNumericByteString(
               new BigDecimal((String) value));
@@ -506,7 +686,7 @@ public class TableRowToStorageApiProto {
               BigDecimal.valueOf(((Number) value).doubleValue()));
         }
         break;
-      case "DATETIME":
+      case DATETIME:
         if (value instanceof String) {
           try {
             // '2011-12-03T10:15:30'
@@ -525,7 +705,7 @@ public class TableRowToStorageApiProto {
           return CivilTimeEncoder.encodePacked64DatetimeMicros((org.joda.time.LocalDateTime) value);
         }
         break;
-      case "TIME":
+      case TIME:
         if (value instanceof String) {
           return CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) value));
         } else if (value instanceof Number) {
@@ -536,12 +716,11 @@ public class TableRowToStorageApiProto {
           return CivilTimeEncoder.encodePacked64TimeMicros((org.joda.time.LocalTime) value);
         }
         break;
-      case "STRING":
-      case "JSON":
-      case "GEOGRAPHY":
+      case STRING:
+      case JSON:
+      case GEOGRAPHY:
         return Preconditions.checkArgumentNotNull(value).toString();
-      case "STRUCT":
-      case "RECORD":
+      case STRUCT:
         if (value instanceof TableRow) {
           TableRow tableRow = (TableRow) value;
           return messageFromTableRow(
@@ -553,6 +732,8 @@ public class TableRowToStorageApiProto {
               schemaInformation, fieldDescriptor.getMessageType(), map, ignoreUnknownValues);
         }
         break;
+      default:
+        throw new RuntimeException("Unknown type " + schemaInformation.getType());
     }
 
     throw new SchemaDoesntMatchException(
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
index 854058a81f3..fefe4d29a2e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java
@@ -85,21 +85,21 @@ public class BeamRowToStorageApiProtoTest {
               FieldDescriptorProto.newBuilder()
                   .setName("bytevalue")
                   .setNumber(1)
-                  .setType(Type.TYPE_INT32)
+                  .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
                   .setName("int16value")
                   .setNumber(2)
-                  .setType(Type.TYPE_INT32)
+                  .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_REQUIRED)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
                   .setName("int32value")
                   .setNumber(3)
-                  .setType(Type.TYPE_INT32)
+                  .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
@@ -120,7 +120,7 @@ public class BeamRowToStorageApiProtoTest {
               FieldDescriptorProto.newBuilder()
                   .setName("floatvalue")
                   .setNumber(6)
-                  .setType(Type.TYPE_FLOAT)
+                  .setType(Type.TYPE_DOUBLE)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
@@ -233,14 +233,14 @@ public class BeamRowToStorageApiProtoTest {
           .build();
   private static final Map<String, Object> BASE_PROTO_EXPECTED_FIELDS =
       ImmutableMap.<String, Object>builder()
-          .put("bytevalue", (int) 1)
-          .put("int16value", (int) 2)
-          .put("int32value", (int) 3)
-          .put("int64value", (long) 4)
+          .put("bytevalue", 1L)
+          .put("int16value", 2L)
+          .put("int32value", 3L)
+          .put("int64value", 4L)
           .put(
               "decimalvalue",
               BeamRowToStorageApiProto.serializeBigDecimalToNumeric(BigDecimal.valueOf(5)))
-          .put("floatvalue", (float) 3.14)
+          .put("floatvalue", (double) 3.14)
           .put("doublevalue", (double) 2.68)
           .put("stringvalue", "I am a string. Hear me roar.")
           .put("datetimevalue", BASE_ROW.getDateTime("datetimeValue").getMillis() * 1000)
@@ -284,7 +284,8 @@ public class BeamRowToStorageApiProtoTest {
   @Test
   public void testDescriptorFromSchema() {
     DescriptorProto descriptor =
-        BeamRowToStorageApiProto.descriptorSchemaFromBeamSchema(BASE_SCHEMA);
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+            BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(BASE_SCHEMA), true);
     Map<String, Type> types =
         descriptor.getFieldList().stream()
             .collect(
@@ -315,7 +316,8 @@ public class BeamRowToStorageApiProtoTest {
   @Test
   public void testNestedFromSchema() {
     DescriptorProto descriptor =
-        BeamRowToStorageApiProto.descriptorSchemaFromBeamSchema(NESTED_SCHEMA);
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
+            BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema((NESTED_SCHEMA)), true);
     Map<String, Type> expectedBaseTypes =
         BASE_SCHEMA_PROTO.getFieldList().stream()
             .collect(
@@ -378,7 +380,9 @@ public class BeamRowToStorageApiProtoTest {
 
   @Test
   public void testMessageFromTableRow() throws Exception {
-    Descriptor descriptor = BeamRowToStorageApiProto.getDescriptorFromSchema(NESTED_SCHEMA);
+    Descriptor descriptor =
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(
+            BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(NESTED_SCHEMA), true);
     DynamicMessage msg = BeamRowToStorageApiProto.messageFromBeamRow(descriptor, NESTED_ROW);
     assertEquals(3, msg.getAllFields().size());
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 1e1749e8569..9cacfde54ad 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
 import com.google.api.services.bigquery.model.Clustering;
 import com.google.api.services.bigquery.model.ErrorProto;
@@ -65,11 +66,9 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
-import java.util.function.LongFunction;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
-import java.util.stream.LongStream;
 import java.util.stream.StreamSupport;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -199,7 +198,6 @@ public class BigQueryIOWriteTest implements Serializable {
                 public void evaluate() throws Throwable {
                   options = TestPipeline.testingPipelineOptions();
                   BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-                  bqOptions.setSchemaUpdateRetries(Integer.MAX_VALUE);
                   bqOptions.setProject("project-id");
                   if (description.getAnnotations().stream()
                       .anyMatch(a -> a.annotationType().equals(ProjectOverride.class))) {
@@ -254,9 +252,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteEmptyPCollection() throws Exception {
-    if (useStreaming || useStorageApi) {
-      return;
-    }
+    assumeTrue(!useStreaming);
+    assumeTrue(!useStorageApi);
     TableSchema schema =
         new TableSchema()
             .setFields(
@@ -290,12 +287,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteDynamicDestinationsStreamingWithAutoSharding() throws Exception {
-    if (useStorageApi) {
-      return;
-    }
-    if (!useStreaming) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(useStreaming);
     writeDynamicDestinations(true, true);
   }
 
@@ -586,9 +579,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testTriggeredFileLoads() throws Exception {
-    if (useStorageApi || !useStreaming) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(useStreaming);
     List<TableRow> elements = Lists.newArrayList();
     for (int i = 0; i < 30; ++i) {
       elements.add(new TableRow().set("number", i));
@@ -677,9 +669,8 @@ public class BigQueryIOWriteTest implements Serializable {
   }
 
   public void testTriggeredFileLoadsWithTempTables(String tableRef) throws Exception {
-    if (useStorageApi || !useStreaming) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(useStreaming);
     List<TableRow> elements = Lists.newArrayList();
     for (int i = 0; i < 30; ++i) {
       elements.add(new TableRow().set("number", i));
@@ -739,9 +730,8 @@ public class BigQueryIOWriteTest implements Serializable {
   @Test
   public void testUntriggeredFileLoadsWithTempTables() throws Exception {
     // Test only non-streaming inserts.
-    if (useStorageApi || useStreaming) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(!useStreaming);
     List<TableRow> elements = Lists.newArrayList();
     for (int i = 0; i < 30; ++i) {
       elements.add(new TableRow().set("number", i));
@@ -773,10 +763,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testTriggeredFileLoadsWithAutoSharding() throws Exception {
-    if (useStorageApi || !useStreaming) {
-      // This test does not make sense for the storage API.
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(useStreaming);
     List<TableRow> elements = Lists.newArrayList();
     for (int i = 0; i < 30; ++i) {
       elements.add(new TableRow().set("number", i));
@@ -845,9 +833,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testFailuresNoRetryPolicy() throws Exception {
-    if (useStorageApi || !useStreaming) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(useStreaming);
     TableRow row1 = new TableRow().set("name", "a").set("number", "1");
     TableRow row2 = new TableRow().set("name", "b").set("number", "2");
     TableRow row3 = new TableRow().set("name", "c").set("number", "3");
@@ -884,9 +871,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testRetryPolicy() throws Exception {
-    if (useStorageApi || !useStreaming) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(useStreaming);
     TableRow row1 = new TableRow().set("name", "a").set("number", "1");
     TableRow row2 = new TableRow().set("name", "b").set("number", "2");
     TableRow row3 = new TableRow().set("name", "c").set("number", "3");
@@ -960,9 +946,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteWithSuccessfulBatchInserts() throws Exception {
-    if (useStreaming || useStorageApi) {
-      return;
-    }
+    assumeTrue(!useStreaming);
+    assumeTrue(!useStorageApi);
 
     WriteResult result =
         p.apply(
@@ -992,9 +977,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteWithSuccessfulBatchInsertsAndWriteRename() throws Exception {
-    if (useStreaming || useStorageApi) {
-      return;
-    }
+    assumeTrue(!useStreaming);
+    assumeTrue(!useStorageApi);
 
     WriteResult result =
         p.apply(
@@ -1026,9 +1010,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteWithoutInsertId() throws Exception {
-    if (useStorageApi || !useStreaming) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(useStreaming);
     TableRow row1 = new TableRow().set("name", "a").set("number", 1);
     TableRow row2 = new TableRow().set("name", "b").set("number", 2);
     TableRow row3 = new TableRow().set("name", "c").set("number", 3);
@@ -1079,9 +1062,9 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteAvro() throws Exception {
-    if (useStorageApi || useStreaming) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(!useStreaming);
+
     p.apply(
             Create.of(
                     InputRecord.create("test", 1, 1.0, Instant.parse("2019-01-01T00:00:00Z")),
@@ -1130,9 +1113,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteAvroWithCustomWriter() throws Exception {
-    if (useStorageApi || useStreaming) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(!useStreaming);
     SerializableFunction<AvroWriteRequest<InputRecord>, GenericRecord> formatFunction =
         r -> {
           GenericRecord rec = new GenericData.Record(r.getSchema());
@@ -1246,9 +1228,7 @@ public class BigQueryIOWriteTest implements Serializable {
   }
 
   private void storageWrite(boolean autoSharding) throws Exception {
-    if (!useStorageApi) {
-      return;
-    }
+    assumeTrue(useStorageApi);
     BigQueryIO.Write<TableRow> write =
         BigQueryIO.writeTableRows()
             .to("project-id:dataset-id.table-id")
@@ -1302,6 +1282,7 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testSchemaWriteLoads() throws Exception {
+    assumeTrue(!useStreaming);
     // withMethod overrides the pipeline option, so we need to explicitly request
     // STORAGE_API_WRITES.
     BigQueryIO.Write.Method method =
@@ -1329,17 +1310,17 @@ public class BigQueryIOWriteTest implements Serializable {
     assertThat(
         fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
         containsInAnyOrder(
-            new TableRow().set("name", "a").set("number", 1),
-            new TableRow().set("name", "b").set("number", 2),
-            new TableRow().set("name", "c").set("number", 3),
-            new TableRow().set("name", "d").set("number", 4)));
+            new TableRow().set("name", "a").set("number", "1"),
+            new TableRow().set("name", "b").set("number", "2"),
+            new TableRow().set("name", "c").set("number", "3"),
+            new TableRow().set("name", "d").set("number", "4")));
   }
 
   @Test
   public void testSchemaWriteStreams() throws Exception {
-    if (useStorageApi || !useStreaming) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
+    assumeTrue(useStreaming);
+
     WriteResult result =
         p.apply(
                 Create.of(
@@ -1358,22 +1339,20 @@ public class BigQueryIOWriteTest implements Serializable {
 
     PAssert.that(result.getSuccessfulInserts())
         .satisfies(
-            new SerializableFunction<Iterable<TableRow>, Void>() {
-              @Override
-              public Void apply(Iterable<TableRow> input) {
-                assertThat(Lists.newArrayList(input).size(), is(4));
-                return null;
-              }
-            });
+            (SerializableFunction<Iterable<TableRow>, Void>)
+                input -> {
+                  assertThat(Lists.newArrayList(input).size(), is(4));
+                  return null;
+                });
     p.run();
 
     assertThat(
         fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
         containsInAnyOrder(
-            new TableRow().set("name", "a").set("number", 1),
-            new TableRow().set("name", "b").set("number", 2),
-            new TableRow().set("name", "c").set("number", 3),
-            new TableRow().set("name", "d").set("number", 4)));
+            new TableRow().set("name", "a").set("number", "1"),
+            new TableRow().set("name", "b").set("number", "2"),
+            new TableRow().set("name", "c").set("number", "3"),
+            new TableRow().set("name", "d").set("number", "4")));
   }
 
   /**
@@ -1572,9 +1551,7 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteUnknown() throws Exception {
-    if (useStorageApi) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
     p.apply(
             Create.of(
                     new TableRow().set("name", "a").set("number", 1),
@@ -1595,9 +1572,7 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteFailedJobs() throws Exception {
-    if (useStorageApi) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
     p.apply(
             Create.of(
                     new TableRow().set("name", "a").set("number", 1),
@@ -1819,120 +1794,6 @@ public class BigQueryIOWriteTest implements Serializable {
     p.run();
   }
 
-  @Test
-  public void testUpdateTableSchemaUseSet() throws Exception {
-    updateTableSchemaTest(true);
-  }
-
-  @Test
-  public void testUpdateTableSchemaUseSetF() throws Exception {
-    updateTableSchemaTest(false);
-  }
-
-  public void updateTableSchemaTest(boolean useSet) throws Exception {
-    if (!useStreaming || !useStorageApi) {
-      return;
-    }
-    BigQueryIO.Write.Method method =
-        useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API;
-    p.enableAbandonedNodeEnforcement(false);
-
-    TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset-id.table");
-    TableSchema tableSchema =
-        new TableSchema()
-            .setFields(
-                ImmutableList.of(
-                    new TableFieldSchema().setName("name").setType("STRING"),
-                    new TableFieldSchema().setName("number").setType("INTEGER")));
-    fakeDatasetService.createTable(new Table().setTableReference(tableRef).setSchema(tableSchema));
-
-    LongFunction<TableRow> getRowSet =
-        (LongFunction<TableRow> & Serializable)
-            (long i) -> {
-              if (i < 5) {
-                return new TableRow().set("name", "name" + i).set("number", Long.toString(i));
-
-              } else {
-                return new TableRow()
-                    .set("name", "name" + i)
-                    .set("number", Long.toString(i))
-                    .set("double_number", Long.toString(i * 2));
-              }
-            };
-
-    LongFunction<TableRow> getRowSetF =
-        (LongFunction<TableRow> & Serializable)
-            (long i) -> {
-              if (i < 5) {
-                return new TableRow()
-                    .setF(
-                        ImmutableList.of(
-                            new TableCell().setV("name" + i),
-                            new TableCell().setV(Long.toString(i))));
-              } else {
-                return new TableRow()
-                    .setF(
-                        ImmutableList.of(
-                            new TableCell().setV("name" + i),
-                            new TableCell().setV(Long.toString(i)),
-                            new TableCell().setV(Long.toString(i * 2))));
-              }
-            };
-
-    LongFunction<TableRow> getRow = useSet ? getRowSet : getRowSetF;
-    final int numRows = 10;
-    PCollection<TableRow> tableRows =
-        p.apply(GenerateSequence.from(0).to(numRows))
-            .apply(
-                MapElements.via(
-                    new SimpleFunction<Long, TableRow>() {
-                      @Override
-                      public TableRow apply(Long input) {
-                        return getRow.apply(input);
-                      }
-                    }))
-            .setCoder(TableRowJsonCoder.of());
-    tableRows.apply(
-        BigQueryIO.writeTableRows()
-            .to(tableRef)
-            .withMethod(method)
-            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
-            .withAutoSchemaUpdate(true)
-            .withTestServices(fakeBqServices)
-            .withoutValidation());
-
-    Thread thread =
-        new Thread(
-            () -> {
-              try {
-                Thread.sleep(5000);
-                TableSchema tableSchemaUpdated =
-                    new TableSchema()
-                        .setFields(
-                            ImmutableList.of(
-                                new TableFieldSchema().setName("name").setType("STRING"),
-                                new TableFieldSchema().setName("number").setType("INTEGER"),
-                                new TableFieldSchema()
-                                    .setName("double_number")
-                                    .setType("INTEGER")));
-                fakeDatasetService.updateTableSchema(tableRef, tableSchemaUpdated);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            });
-    thread.start();
-    p.run();
-    thread.join();
-
-    assertThat(
-        fakeDatasetService.getAllRows(
-            tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
-        containsInAnyOrder(
-            Iterables.toArray(
-                LongStream.range(0, numRows).mapToObj(getRowSet).collect(Collectors.toList()),
-                TableRow.class)));
-  }
-
   @Test
   public void testBigQueryIOGetName() {
     assertEquals(
@@ -1969,9 +1830,7 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteValidateFailsBothFormatFunctions() {
-    if (useStorageApi) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
     p.enableAbandonedNodeEnforcement(false);
 
     thrown.expect(IllegalArgumentException.class);
@@ -1990,9 +1849,7 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteValidateFailsWithBeamSchemaAndAvroFormatFunction() {
-    if (useStorageApi) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
     p.enableAbandonedNodeEnforcement(false);
 
     thrown.expect(IllegalArgumentException.class);
@@ -2008,9 +1865,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteValidateFailsWithAvroFormatAndStreamingInserts() {
-    if (!useStreaming && !useStorageApi) {
-      return;
-    }
+    assumeTrue(useStreaming);
+    assumeTrue(!useStorageApi);
     p.enableAbandonedNodeEnforcement(false);
 
     thrown.expect(IllegalArgumentException.class);
@@ -2027,9 +1883,7 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteValidateFailsWithBatchAutoSharding() {
-    if (useStorageApi) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
     p.enableAbandonedNodeEnforcement(false);
 
     thrown.expect(IllegalArgumentException.class);
@@ -2497,9 +2351,7 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testExtendedErrorRetrieval() throws Exception {
-    if (useStorageApi) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
     TableRow row1 = new TableRow().set("name", "a").set("number", "1");
     TableRow row2 = new TableRow().set("name", "b").set("number", "2");
     TableRow row3 = new TableRow().set("name", "c").set("number", "3");
@@ -2552,13 +2404,7 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testStorageApiErrors() throws Exception {
-    BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
-    bqOptions.setSchemaUpdateRetries(1);
-
-    if (!useStorageApi) {
-      return;
-    }
-
+    assumeTrue(useStorageApi);
     final Method method =
         useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : Method.STORAGE_WRITE_API;
 
@@ -2657,9 +2503,7 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWrongErrorConfigs() {
-    if (useStorageApi) {
-      return;
-    }
+    assumeTrue(!useStorageApi);
     p.enableAutoRunIfMissing(true);
     TableRow row1 = new TableRow().set("name", "a").set("number", "1");
 
@@ -2777,9 +2621,8 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testSchemaUpdateOptionsFailsStreamingInserts() throws Exception {
-    if (!useStreaming && !useStorageApi) {
-      return;
-    }
+    assumeTrue(useStreaming);
+    assumeTrue(!useStorageApi);
     Set<SchemaUpdateOption> options = EnumSet.of(SchemaUpdateOption.ALLOW_FIELD_ADDITION);
     p.enableAbandonedNodeEnforcement(false);
     thrown.expect(IllegalArgumentException.class);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index b832a9b3612..eacb95a9a68 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -599,7 +599,7 @@ public class BigQueryUtilsTest {
 
     assertThat(row.size(), equalTo(22));
     assertThat(row, hasEntry("id", "123"));
-    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
     assertThat(row, hasEntry("datetime0ms", "2020-11-02T12:34:56"));
     assertThat(row, hasEntry("datetime0s_ns", "2020-11-02T12:34:00.789876"));
@@ -610,12 +610,12 @@ public class BigQueryUtilsTest {
     assertThat(row, hasEntry("time0s_ns", "12:34:00.789876"));
     assertThat(row, hasEntry("time0s_0ns", "12:34:00"));
     assertThat(row, hasEntry("name", "test"));
-    assertThat(row, hasEntry("valid", false));
+    assertThat(row, hasEntry("valid", "false"));
     assertThat(row, hasEntry("binary", "ABCD1234"));
     assertThat(row, hasEntry("numeric", "123.456"));
-    assertThat(row, hasEntry("boolean", true));
+    assertThat(row, hasEntry("boolean", "true"));
     assertThat(row, hasEntry("long", "123"));
-    assertThat(row, hasEntry("double", 123.456));
+    assertThat(row, hasEntry("double", "123.456"));
   }
 
   @Test
@@ -642,7 +642,7 @@ public class BigQueryUtilsTest {
     row = ((List<TableRow>) row.get("map")).get(0);
     assertThat(row.size(), equalTo(2));
     assertThat(row, hasEntry("key", "test"));
-    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("value", "123.456"));
   }
 
   @Test
@@ -653,7 +653,7 @@ public class BigQueryUtilsTest {
     row = (TableRow) row.get("row");
     assertThat(row.size(), equalTo(22));
     assertThat(row, hasEntry("id", "123"));
-    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
     assertThat(row, hasEntry("datetime0ms", "2020-11-02T12:34:56"));
     assertThat(row, hasEntry("datetime0s_ns", "2020-11-02T12:34:00.789876"));
@@ -664,12 +664,12 @@ public class BigQueryUtilsTest {
     assertThat(row, hasEntry("time0s_ns", "12:34:00.789876"));
     assertThat(row, hasEntry("time0s_0ns", "12:34:00"));
     assertThat(row, hasEntry("name", "test"));
-    assertThat(row, hasEntry("valid", false));
+    assertThat(row, hasEntry("valid", "false"));
     assertThat(row, hasEntry("binary", "ABCD1234"));
     assertThat(row, hasEntry("numeric", "123.456"));
-    assertThat(row, hasEntry("boolean", true));
+    assertThat(row, hasEntry("boolean", "true"));
     assertThat(row, hasEntry("long", "123"));
-    assertThat(row, hasEntry("double", 123.456));
+    assertThat(row, hasEntry("double", "123.456"));
   }
 
   @Test
@@ -680,7 +680,7 @@ public class BigQueryUtilsTest {
     row = ((List<TableRow>) row.get("rows")).get(0);
     assertThat(row.size(), equalTo(22));
     assertThat(row, hasEntry("id", "123"));
-    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
     assertThat(row, hasEntry("datetime0ms", "2020-11-02T12:34:56"));
     assertThat(row, hasEntry("datetime0s_ns", "2020-11-02T12:34:00.789876"));
@@ -691,12 +691,12 @@ public class BigQueryUtilsTest {
     assertThat(row, hasEntry("time0s_ns", "12:34:00.789876"));
     assertThat(row, hasEntry("time0s_0ns", "12:34:00"));
     assertThat(row, hasEntry("name", "test"));
-    assertThat(row, hasEntry("valid", false));
+    assertThat(row, hasEntry("valid", "false"));
     assertThat(row, hasEntry("binary", "ABCD1234"));
     assertThat(row, hasEntry("numeric", "123.456"));
-    assertThat(row, hasEntry("boolean", true));
+    assertThat(row, hasEntry("boolean", "true"));
     assertThat(row, hasEntry("long", "123"));
-    assertThat(row, hasEntry("double", 123.456));
+    assertThat(row, hasEntry("double", "123.456"));
   }
 
   @Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 0a376c8082d..27a772a60ed 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -44,7 +44,9 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -467,10 +469,12 @@ public class TableRowToStorageApiProtoTest {
                           .setFields(BASE_TABLE_SCHEMA_NO_F.getFields()))
                   .build());
 
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
   @Test
   public void testDescriptorFromTableSchema() {
     DescriptorProto descriptor =
-        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA, true);
     Map<String, Type> types =
         descriptor.getFieldList().stream()
             .collect(
@@ -485,7 +489,7 @@ public class TableRowToStorageApiProtoTest {
   @Test
   public void testNestedFromTableSchema() {
     DescriptorProto descriptor =
-        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA);
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA, true);
     Map<String, Type> expectedBaseTypes =
         BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
             .collect(
@@ -687,7 +691,7 @@ public class TableRowToStorageApiProtoTest {
             .set("nestedValueNoF2", BASE_TABLE_ROW_NO_F);
 
     Descriptor descriptor =
-        TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA);
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA, true);
     TableRowToStorageApiProto.SchemaInformation schemaInformation =
         TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA);
     DynamicMessage msg =
@@ -707,7 +711,7 @@ public class TableRowToStorageApiProtoTest {
   @Test
   public void testMessageWithFFromTableRow() throws Exception {
     Descriptor descriptor =
-        TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA);
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA, true);
     TableRowToStorageApiProto.SchemaInformation schemaInformation =
         TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA);
     DynamicMessage msg =
@@ -750,7 +754,7 @@ public class TableRowToStorageApiProtoTest {
             .set("repeatednof1", ImmutableList.of(BASE_TABLE_ROW_NO_F, BASE_TABLE_ROW_NO_F))
             .set("repeatednof2", ImmutableList.of(BASE_TABLE_ROW_NO_F, BASE_TABLE_ROW_NO_F));
     Descriptor descriptor =
-        TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA);
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA, true);
     TableRowToStorageApiProto.SchemaInformation schemaInformation =
         TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA);
     DynamicMessage msg =
@@ -795,7 +799,7 @@ public class TableRowToStorageApiProtoTest {
             .set("repeatednof1", null)
             .set("repeatednof2", null);
     Descriptor descriptor =
-        TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA);
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA, true);
     TableRowToStorageApiProto.SchemaInformation schemaInformation =
         TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA);
     DynamicMessage msg =
@@ -818,4 +822,70 @@ public class TableRowToStorageApiProtoTest {
         (List<DynamicMessage>) msg.getField(fieldDescriptors.get("repeatednof2"));
     assertTrue(repeatednof2.isEmpty());
   }
+
+  @Test
+  public void testRejectUnknownField() throws Exception {
+    TableRow row = new TableRow();
+    row.putAll(BASE_TABLE_ROW_NO_F);
+    row.set("unknown", "foobar");
+
+    Descriptor descriptor =
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA_NO_F, true);
+    TableRowToStorageApiProto.SchemaInformation schemaInformation =
+        TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA_NO_F);
+
+    thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class);
+    TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, row, false);
+  }
+
+  @Test
+  public void testRejectUnknownFieldF() throws Exception {
+    TableRow row = new TableRow();
+    List<TableCell> cells = Lists.newArrayList(BASE_TABLE_ROW.getF());
+    cells.add(new TableCell().setV("foobar"));
+    row.setF(cells);
+
+    Descriptor descriptor =
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA, true);
+    TableRowToStorageApiProto.SchemaInformation schemaInformation =
+        TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA);
+
+    thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class);
+    TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, row, false);
+  }
+
+  @Test
+  public void testRejectUnknownNestedField() throws Exception {
+    TableRow rowNoF = new TableRow();
+    rowNoF.putAll(BASE_TABLE_ROW_NO_F);
+    rowNoF.set("unknown", "foobar");
+
+    TableRow topRow = new TableRow().set("nestedValueNoF1", rowNoF);
+
+    Descriptor descriptor =
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA, true);
+    TableRowToStorageApiProto.SchemaInformation schemaInformation =
+        TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA);
+
+    thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class);
+    TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, topRow, false);
+  }
+
+  @Test
+  public void testRejectUnknownNestedFieldF() throws Exception {
+    TableRow rowWithF = new TableRow();
+    List<TableCell> cells = Lists.newArrayList(BASE_TABLE_ROW.getF());
+    cells.add(new TableCell().setV("foobar"));
+    rowWithF.setF(cells);
+
+    TableRow topRow = new TableRow().set("nestedValue1", rowWithF);
+
+    Descriptor descriptor =
+        TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA, true);
+    TableRowToStorageApiProto.SchemaInformation schemaInformation =
+        TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA);
+
+    thrown.expect(TableRowToStorageApiProto.SchemaConversionException.class);
+    TableRowToStorageApiProto.messageFromTableRow(schemaInformation, descriptor, topRow, false);
+  }
 }