You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/04/30 05:03:05 UTC

[beam] branch master updated: Merge pull request #17404: [BEAM-13990] support date and timestamp fields

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 58b4d762eec Merge pull request #17404: [BEAM-13990] support date and timestamp fields
58b4d762eec is described below

commit 58b4d762eece66774a5df6ca54e6f91c49057c9b
Author: Reuven Lax <re...@google.com>
AuthorDate: Fri Apr 29 22:02:56 2022 -0700

    Merge pull request #17404: [BEAM-13990] support date and timestamp fields
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   1 +
 sdks/java/io/google-cloud-platform/build.gradle    |   2 +
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       |   2 +
 .../StorageApiDynamicDestinationsTableRow.java     |  13 +-
 .../bigquery/StorageApiWriteUnshardedRecords.java  |   4 +
 .../bigquery/StorageApiWritesShardedRecords.java   |   4 +
 .../sdk/io/gcp/bigquery/TableRowJsonCoder.java     |   8 +-
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 339 ++++++++++++-----
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |   8 -
 .../gcp/bigquery/TableRowToStorageApiProtoIT.java  | 407 +++++++++++++++++++++
 .../bigquery/TableRowToStorageApiProtoTest.java    | 140 ++++---
 .../beam/sdk/io/gcp/spanner/StructUtilsTest.java   |   3 +-
 12 files changed, 777 insertions(+), 154 deletions(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 5d5b1e9377f..0f0c11dc2cb 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -631,6 +631,7 @@ class BeamModulePlugin implements Plugin<Project> {
         jackson_dataformat_xml                      : "com.fasterxml.jackson.dataformat:jackson-dataformat-xml:$jackson_version",
         jackson_dataformat_yaml                     : "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jackson_version",
         jackson_datatype_joda                       : "com.fasterxml.jackson.datatype:jackson-datatype-joda:$jackson_version",
+        jackson_datatype_jsr310                     : "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jackson_version",
         jackson_module_scala_2_11                   : "com.fasterxml.jackson.module:jackson-module-scala_2.11:$jackson_version",
         jackson_module_scala_2_12                   : "com.fasterxml.jackson.module:jackson-module-scala_2.12:$jackson_version",
         // Swap to use the officially published version of 0.4.x once available
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 8d1d0a1776d..cd71fc35e21 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -116,6 +116,8 @@ dependencies {
   implementation library.java.http_core
   implementation library.java.jackson_core
   implementation library.java.jackson_databind
+  implementation library.java.jackson_datatype_joda
+  implementation library.java.jackson_datatype_jsr310
   implementation library.java.joda_time
   implementation library.java.junit
   implementation library.java.netty_handler
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 54f485a9d71..2a3e595adc1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -3085,6 +3085,8 @@ public class BigQueryIO {
     CreateTables.clearCreatedTables();
     TwoLevelMessageConverterCache.clear();
     StorageApiDynamicDestinationsTableRow.clearSchemaCache();
+    StorageApiWriteUnshardedRecords.clearCache();
+    StorageApiWritesShardedRecords.clearCache();
   }
 
   /////////////////////////////////////////////////////////////////////////////
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index c48dbe0dedb..3c9f676cfad 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -70,6 +70,7 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
       DestinationT destination, DatasetService datasetService) throws Exception {
     return new MessageConverter<T>() {
       TableSchema tableSchema;
+      TableRowToStorageApiProto.SchemaInformation schemaInformation;
       Descriptor descriptor;
       long descriptorHash;
 
@@ -103,7 +104,8 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
               MoreObjects.firstNonNull(
                   SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), tableSchema);
         }
-
+        schemaInformation =
+            TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
         descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
         descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
       }
@@ -138,6 +140,8 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
         }
         synchronized (this) {
           tableSchema = newSchema;
+          schemaInformation =
+              TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
           descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
           long newHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
           if (descriptorHash != newHash) {
@@ -154,16 +158,21 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
       public StorageApiWritePayload toMessage(T element) throws Exception {
         int attempt = 0;
         do {
+          TableRowToStorageApiProto.SchemaInformation localSchemaInformation;
           Descriptor localDescriptor;
           long localDescriptorHash;
           synchronized (this) {
+            localSchemaInformation = schemaInformation;
             localDescriptor = descriptor;
             localDescriptorHash = descriptorHash;
           }
           try {
             Message msg =
                 TableRowToStorageApiProto.messageFromTableRow(
-                    localDescriptor, formatFunction.apply(element), ignoreUnknownValues);
+                    localSchemaInformation,
+                    localDescriptor,
+                    formatFunction.apply(element),
+                    ignoreUnknownValues);
             return new AutoValue_StorageApiWritePayload(msg.toByteArray(), localDescriptorHash);
           } catch (SchemaTooNarrowException e) {
             if (attempt > schemaUpdateRetries) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index f8619d5fd62..1751799dd51 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -87,6 +87,10 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
               })
           .build();
 
+  static void clearCache() {
+    APPEND_CLIENTS.invalidateAll();
+  }
+
   // Run a closure asynchronously, ignoring failures.
   private interface ThrowingRunnable {
     void run() throws Exception;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 787cb0c2d62..c0f60f34c61 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -119,6 +119,10 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT>
               })
           .build();
 
+  static void clearCache() {
+    APPEND_CLIENTS.invalidateAll();
+  }
+
   // Run a closure asynchronously, ignoring failures.
   private interface ThrowingRunnable {
     void run() throws Exception;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
index 7e82eec212d..9b80c0b552b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import com.google.api.services.bigquery.model.TableRow;
 import java.io.IOException;
 import java.io.InputStream;
@@ -68,7 +70,11 @@ public class TableRowJsonCoder extends AtomicCoder<TableRow> {
   // FAIL_ON_EMPTY_BEANS is disabled in order to handle null values in
   // TableRow.
   private static final ObjectMapper MAPPER =
-      new ObjectMapper().disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
+      new ObjectMapper()
+          .registerModule(new JavaTimeModule())
+          .registerModule(new JodaModule())
+          .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
+          .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
 
   private static final TableRowJsonCoder INSTANCE = new TableRowJsonCoder();
   private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR = new TypeDescriptor<TableRow>() {};
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index 2f241a434a8..750542b7121 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -22,6 +22,7 @@ import static java.util.stream.Collectors.toList;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.DescriptorProtos.DescriptorProto;
 import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
@@ -34,19 +35,28 @@ import com.google.protobuf.Descriptors.FieldDescriptor;
 import com.google.protobuf.Descriptors.FileDescriptor;
 import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.Message;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.ChronoUnit;
 import java.util.AbstractMap;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.function.Function;
 import javax.annotation.Nullable;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.joda.time.Days;
 
 /**
  * Utility methods for converting JSON {@link TableRow} objects to dynamic protocol message, for use
@@ -71,6 +81,58 @@ public class TableRowToStorageApiProto {
     }
   }
 
+  static class SchemaInformation {
+    private final TableFieldSchema tableFieldSchema;
+    private final List<SchemaInformation> subFields;
+    private final Map<String, SchemaInformation> subFieldsByName;
+
+    private SchemaInformation(TableFieldSchema tableFieldSchema) {
+      this.tableFieldSchema = tableFieldSchema;
+      this.subFields = Lists.newArrayList();
+      this.subFieldsByName = Maps.newHashMap();
+      if (tableFieldSchema.getFields() != null) {
+        for (TableFieldSchema field : tableFieldSchema.getFields()) {
+          SchemaInformation schemaInformation = new SchemaInformation(field);
+          subFields.add(schemaInformation);
+          subFieldsByName.put(field.getName(), schemaInformation);
+        }
+      }
+    }
+
+    public String getName() {
+      return tableFieldSchema.getName();
+    }
+
+    public String getType() {
+      return tableFieldSchema.getType();
+    }
+
+    public SchemaInformation getSchemaForField(String name) {
+      SchemaInformation schemaInformation = subFieldsByName.get(name);
+      if (schemaInformation == null) {
+        throw new RuntimeException("Schema field not found: " + name);
+      }
+      return schemaInformation;
+    }
+
+    public SchemaInformation getSchemaForField(int i) {
+      SchemaInformation schemaInformation = subFields.get(i);
+      if (schemaInformation == null) {
+        throw new RuntimeException("Schema field not found: " + i);
+      }
+      return schemaInformation;
+    }
+
+    static SchemaInformation fromTableSchema(TableSchema tableSchema) {
+      TableFieldSchema rootSchema =
+          new TableFieldSchema()
+              .setName("__root__")
+              .setType("RECORD")
+              .setFields(tableSchema.getFields());
+      return new SchemaInformation(rootSchema);
+    }
+  }
+
   static final Map<String, Type> PRIMITIVE_TYPES =
       ImmutableMap.<String, Type>builder()
           .put("INT64", Type.TYPE_INT64)
@@ -81,13 +143,13 @@ public class TableRowToStorageApiProto {
           .put("BOOL", Type.TYPE_BOOL)
           .put("BOOLEAN", Type.TYPE_BOOL)
           .put("BYTES", Type.TYPE_BYTES)
-          .put("NUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding.
-          .put("BIGNUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding.
+          .put("NUMERIC", Type.TYPE_BYTES) // Pass through the JSON encoding.
+          .put("BIGNUMERIC", Type.TYPE_BYTES) // Pass through the JSON encoding.
           .put("GEOGRAPHY", Type.TYPE_STRING) // Pass through the JSON encoding.
-          .put("DATE", Type.TYPE_STRING) // Pass through the JSON encoding.
-          .put("TIME", Type.TYPE_STRING) // Pass through the JSON encoding.
-          .put("DATETIME", Type.TYPE_STRING) // Pass through the JSON encoding.
-          .put("TIMESTAMP", Type.TYPE_STRING) // Pass through the JSON encoding.
+          .put("DATE", Type.TYPE_INT32)
+          .put("TIME", Type.TYPE_INT64)
+          .put("DATETIME", Type.TYPE_INT64)
+          .put("TIMESTAMP", Type.TYPE_INT64)
           .put("JSON", Type.TYPE_STRING)
           .build();
 
@@ -107,7 +169,10 @@ public class TableRowToStorageApiProto {
   }
 
   public static DynamicMessage messageFromMap(
-      Descriptor descriptor, AbstractMap<String, Object> map, boolean ignoreUnknownValues)
+      SchemaInformation schemaInformation,
+      Descriptor descriptor,
+      AbstractMap<String, Object> map,
+      boolean ignoreUnknownValues)
       throws SchemaConversionException {
     DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
     for (Map.Entry<String, Object> entry : map.entrySet()) {
@@ -121,9 +186,12 @@ public class TableRowToStorageApiProto {
               "TableRow contained unexpected field with name " + entry.getKey());
         }
       }
+      SchemaInformation fieldSchemaInformation =
+          schemaInformation.getSchemaForField(entry.getKey());
       @Nullable
       Object value =
-          messageValueFromFieldValue(fieldDescriptor, entry.getValue(), ignoreUnknownValues);
+          messageValueFromFieldValue(
+              fieldSchemaInformation, fieldDescriptor, entry.getValue(), ignoreUnknownValues);
       if (value != null) {
         builder.setField(fieldDescriptor, value);
       }
@@ -136,7 +204,10 @@ public class TableRowToStorageApiProto {
    * using the BigQuery Storage API.
    */
   public static DynamicMessage messageFromTableRow(
-      Descriptor descriptor, TableRow tableRow, boolean ignoreUnkownValues)
+      SchemaInformation schemaInformation,
+      Descriptor descriptor,
+      TableRow tableRow,
+      boolean ignoreUnkownValues)
       throws SchemaConversionException {
     @Nullable Object fValue = tableRow.get("f");
     if (fValue instanceof List) {
@@ -153,9 +224,11 @@ public class TableRowToStorageApiProto {
       for (int i = 0; i < cellsToProcess; ++i) {
         AbstractMap<String, Object> cell = cells.get(i);
         FieldDescriptor fieldDescriptor = descriptor.getFields().get(i);
+        SchemaInformation fieldSchemaInformation = schemaInformation.getSchemaForField(i);
         @Nullable
         Object value =
-            messageValueFromFieldValue(fieldDescriptor, cell.get("v"), ignoreUnkownValues);
+            messageValueFromFieldValue(
+                fieldSchemaInformation, fieldDescriptor, cell.get("v"), ignoreUnkownValues);
         if (value != null) {
           builder.setField(fieldDescriptor, value);
         }
@@ -163,7 +236,7 @@ public class TableRowToStorageApiProto {
 
       return builder.build();
     } else {
-      return messageFromMap(descriptor, tableRow, ignoreUnkownValues);
+      return messageFromMap(schemaInformation, descriptor, tableRow, ignoreUnkownValues);
     }
   }
 
@@ -218,125 +291,191 @@ public class TableRowToStorageApiProto {
   }
 
   @Nullable
+  @SuppressWarnings({"nullness"})
   private static Object messageValueFromFieldValue(
-      FieldDescriptor fieldDescriptor, @Nullable Object bqValue, boolean ignoreUnknownValues)
+      SchemaInformation schemaInformation,
+      FieldDescriptor fieldDescriptor,
+      @Nullable Object bqValue,
+      boolean ignoreUnknownValues)
       throws SchemaConversionException {
     if (bqValue == null) {
       if (fieldDescriptor.isOptional()) {
         return null;
       } else if (fieldDescriptor.isRepeated()) {
         return Collections.emptyList();
-      }
-      {
+      } else {
         throw new IllegalArgumentException(
             "Received null value for non-nullable field " + fieldDescriptor.getName());
       }
     }
-    return toProtoValue(
-        fieldDescriptor, bqValue, fieldDescriptor.isRepeated(), ignoreUnknownValues);
-  }
-
-  private static final Map<FieldDescriptor.Type, Function<String, Object>>
-      JSON_PROTO_STRING_PARSERS =
-          ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
-              .put(FieldDescriptor.Type.INT32, Integer::valueOf)
-              .put(FieldDescriptor.Type.INT64, Long::valueOf)
-              .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
-              .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
-              .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
-              .put(FieldDescriptor.Type.STRING, str -> str)
-              .put(
-                  FieldDescriptor.Type.BYTES,
-                  b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
-              .build();
 
-  @Nullable
-  @SuppressWarnings({"nullness"})
-  @VisibleForTesting
-  static Object toProtoValue(
-      FieldDescriptor fieldDescriptor,
-      Object jsonBQValue,
-      boolean isRepeated,
-      boolean ignoreUnknownValues)
-      throws SchemaConversionException {
-    if (isRepeated) {
-      List<Object> listValue = (List<Object>) jsonBQValue;
+    if (fieldDescriptor.isRepeated()) {
+      List<Object> listValue = (List<Object>) bqValue;
       List<Object> protoList = Lists.newArrayListWithCapacity(listValue.size());
-      for (Object o : listValue) {
-        protoList.add(toProtoValue(fieldDescriptor, o, false, ignoreUnknownValues));
+      for (@Nullable Object o : listValue) {
+        if (o != null) { // repeated field cannot contain null.
+          protoList.add(
+              singularFieldToProtoValue(
+                  schemaInformation, fieldDescriptor, o, ignoreUnknownValues));
+        }
       }
       return protoList;
     }
-
-    if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
-      if (jsonBQValue instanceof TableRow) {
-        TableRow tableRow = (TableRow) jsonBQValue;
-        return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow, ignoreUnknownValues);
-      } else if (jsonBQValue instanceof AbstractMap) {
-        // This will handle nested rows.
-        AbstractMap<String, Object> map = ((AbstractMap<String, Object>) jsonBQValue);
-        return messageFromMap(fieldDescriptor.getMessageType(), map, ignoreUnknownValues);
-      } else {
-        throw new RuntimeException("Unexpected value " + jsonBQValue + " Expected a JSON map.");
-      }
-    }
-    @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor, jsonBQValue);
-    if (scalarValue == null) {
-      return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated, ignoreUnknownValues);
-    } else {
-      return scalarValue;
-    }
+    return singularFieldToProtoValue(
+        schemaInformation, fieldDescriptor, bqValue, ignoreUnknownValues);
   }
 
   @VisibleForTesting
   @Nullable
-  static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object jsonBQValue) {
-    if (jsonBQValue instanceof String) {
-      Function<String, Object> mapper = JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
-      if (mapper == null) {
-        throw new UnsupportedOperationException(
-            "Converting BigQuery type '"
-                + jsonBQValue.getClass()
-                + "' to '"
-                + fieldDescriptor
-                + "' is not supported");
-      }
-      return mapper.apply((String) jsonBQValue);
-    }
-
-    switch (fieldDescriptor.getType()) {
-      case BOOL:
-        if (jsonBQValue instanceof Boolean) {
-          return jsonBQValue;
+  static Object singularFieldToProtoValue(
+      SchemaInformation schemaInformation,
+      FieldDescriptor fieldDescriptor,
+      Object value,
+      boolean ignoreUnknownValues)
+      throws SchemaConversionException {
+    switch (schemaInformation.getType()) {
+      case "INT64":
+      case "INTEGER":
+        if (value instanceof String) {
+          return Long.valueOf((String) value);
+        } else if (value instanceof Integer || value instanceof Long) {
+          return ((Number) value).longValue();
         }
         break;
-      case BYTES:
+      case "FLOAT64":
+      case "FLOAT":
+        if (value instanceof String) {
+          return Double.valueOf((String) value);
+        } else if (value instanceof Double || value instanceof Float) {
+          return ((Number) value).doubleValue();
+        }
         break;
-      case INT64:
-        if (jsonBQValue instanceof Integer) {
-          return Long.valueOf((Integer) jsonBQValue);
-        } else if (jsonBQValue instanceof Long) {
-          return jsonBQValue;
+      case "BOOLEAN":
+      case "BOOL":
+        if (value instanceof String) {
+          return Boolean.valueOf((String) value);
+        } else if (value instanceof Boolean) {
+          return value;
         }
         break;
-      case INT32:
-        if (jsonBQValue instanceof Integer) {
-          return jsonBQValue;
+      case "BYTES":
+        if (value instanceof String) {
+          return ByteString.copyFrom(BaseEncoding.base64().decode((String) value));
+        } else if (value instanceof byte[]) {
+          return ByteString.copyFrom((byte[]) value);
+        } else if (value instanceof ByteString) {
+          return value;
         }
         break;
-      case STRING:
+      case "TIMESTAMP":
+        if (value instanceof String) {
+          try {
+            return ChronoUnit.MICROS.between(Instant.EPOCH, Instant.parse((String) value));
+          } catch (DateTimeParseException e) {
+            return ChronoUnit.MICROS.between(
+                Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) value)));
+          }
+        } else if (value instanceof Instant) {
+          return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value);
+        } else if (value instanceof org.joda.time.Instant) {
+          // joda instant precision is millisecond
+          return ((org.joda.time.Instant) value).getMillis() * 1000L;
+        } else if (value instanceof Integer || value instanceof Long) {
+          return ((Number) value).longValue();
+        } else if (value instanceof Double || value instanceof Float) {
+          // assume value represents number of seconds since epoch
+          return BigDecimal.valueOf(((Number) value).doubleValue())
+              .scaleByPowerOfTen(6)
+              .setScale(0, RoundingMode.HALF_UP)
+              .longValue();
+        }
         break;
-      case DOUBLE:
-        if (jsonBQValue instanceof Double) {
-          return jsonBQValue;
-        } else if (jsonBQValue instanceof Float) {
-          return Double.valueOf((Float) jsonBQValue);
+      case "DATE":
+        if (value instanceof String) {
+          return ((Long) LocalDate.parse((String) value).toEpochDay()).intValue();
+        } else if (value instanceof LocalDate) {
+          return ((Long) ((LocalDate) value).toEpochDay()).intValue();
+        } else if (value instanceof org.joda.time.LocalDate) {
+          return Days.daysBetween(
+                  org.joda.time.Instant.EPOCH.toDateTime().toLocalDate(),
+                  (org.joda.time.LocalDate) value)
+              .getDays();
+        } else if (value instanceof Integer || value instanceof Long) {
+          return ((Number) value).intValue();
+        }
+        break;
+      case "NUMERIC":
+        if (value instanceof String) {
+          return BigDecimalByteStringEncoder.encodeToNumericByteString(
+              new BigDecimal((String) value));
+        } else if (value instanceof BigDecimal) {
+          return BigDecimalByteStringEncoder.encodeToNumericByteString(((BigDecimal) value));
+        } else if (value instanceof Double || value instanceof Float) {
+          return BigDecimalByteStringEncoder.encodeToNumericByteString(
+              BigDecimal.valueOf(((Number) value).doubleValue()));
+        }
+        break;
+      case "BIGNUMERIC":
+        if (value instanceof String) {
+          return BigDecimalByteStringEncoder.encodeToBigNumericByteString(
+              new BigDecimal((String) value));
+        } else if (value instanceof BigDecimal) {
+          return BigDecimalByteStringEncoder.encodeToBigNumericByteString(((BigDecimal) value));
+        } else if (value instanceof Double || value instanceof Float) {
+          return BigDecimalByteStringEncoder.encodeToBigNumericByteString(
+              BigDecimal.valueOf(((Number) value).doubleValue()));
+        }
+        break;
+      case "DATETIME":
+        if (value instanceof String) {
+          return CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) value));
+        } else if (value instanceof Number) {
+          return ((Number) value).longValue();
+        } else if (value instanceof LocalDateTime) {
+          return CivilTimeEncoder.encodePacked64DatetimeMicros((LocalDateTime) value);
+        } else if (value instanceof org.joda.time.LocalDateTime) {
+          return CivilTimeEncoder.encodePacked64DatetimeMicros((org.joda.time.LocalDateTime) value);
+        }
+        break;
+      case "TIME":
+        if (value instanceof String) {
+          return CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) value));
+        } else if (value instanceof Number) {
+          return ((Number) value).longValue();
+        } else if (value instanceof LocalTime) {
+          return CivilTimeEncoder.encodePacked64TimeMicros((LocalTime) value);
+        } else if (value instanceof org.joda.time.LocalTime) {
+          return CivilTimeEncoder.encodePacked64TimeMicros((org.joda.time.LocalTime) value);
+        }
+        break;
+      case "STRING":
+      case "JSON":
+      case "GEOGRAPHY":
+        return value.toString();
+      case "STRUCT":
+      case "RECORD":
+        if (value instanceof TableRow) {
+          TableRow tableRow = (TableRow) value;
+          return messageFromTableRow(
+              schemaInformation, fieldDescriptor.getMessageType(), tableRow, ignoreUnknownValues);
+        } else if (value instanceof AbstractMap) {
+          // This will handle nested rows.
+          AbstractMap<String, Object> map = ((AbstractMap<String, Object>) value);
+          return messageFromMap(
+              schemaInformation, fieldDescriptor.getMessageType(), map, ignoreUnknownValues);
         }
         break;
-      default:
-        throw new RuntimeException("Unsupported proto type " + fieldDescriptor.getType());
     }
-    return null;
+
+    throw new RuntimeException(
+        "Unexpected value :"
+            + value
+            + ", type: "
+            + value.getClass()
+            + ". Table field name: "
+            + schemaInformation.getName()
+            + ", type: "
+            + schemaInformation.getType());
   }
 
   @VisibleForTesting
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 35dc57218f1..50f1bde09b9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -523,14 +523,6 @@ public class BigQueryIOWriteTest implements Serializable {
     testTimePartitioning(method);
   }
 
-  @Test
-  public void testTimePartitioningStorageApi() throws Exception {
-    if (!useStorageApi) {
-      return;
-    }
-    testTimePartitioning(Method.STORAGE_WRITE_API);
-  }
-
   @Test
   public void testClusteringStorageApi() throws Exception {
     if (useStorageApi) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java
new file mode 100644
index 00000000000..34962a90a57
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class);
+  private static final BigqueryClient BQ_CLIENT = new BigqueryClient("TableRowToStorageApiProtoIT");
+  private static final String PROJECT =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  private static final String BIG_QUERY_DATASET_ID =
+      "table_row_to_storage_api_proto_" + System.nanoTime();
+
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue"))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("BYTES")
+                          .setMode("REPEATED")
+                          .setName("arrayValue"))
+                  .build());
+
+  private static final List<Object> REPEATED_BYTES =
+      ImmutableList.of(
+          BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)),
+          "goodbye".getBytes(StandardCharsets.UTF_8),
+          "solong".getBytes(StandardCharsets.UTF_8));
+
+  private static final TableRow BASE_TABLE_ROW =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", "2.8168")
+          .set("floatValue", "2.817")
+          .set("boolValue", "true")
+          .set("booleanValue", "true")
+          .set("timestampValue", "1970-01-01T00:00:00.000043Z")
+          .set("timeValue", "00:52:07.123456")
+          .set("datetimeValue", "2019-08-16T00:52:07.123456")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4")
+          .set("bigNumericValue", "23334.4")
+          .set("arrayValue", REPEATED_BYTES);
+
+  private static final TableRow BASE_TABLE_ROW_JODA_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", org.joda.time.Instant.parse("1970-01-01T00:00:00.0043Z"))
+          .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456"))
+          .set("datetimeValue", org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+          .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16"))
+          .set("numericValue", new BigDecimal("23.4"))
+          .set("bigNumericValue", "23334.4")
+          .set("arrayValue", REPEATED_BYTES);
+
+  private static final TableRow BASE_TABLE_ROW_JAVA_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z"))
+          .set("timeValue", LocalTime.parse("00:52:07.123456"))
+          .set("datetimeValue", LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+          .set("dateValue", LocalDate.parse("2019-08-16"))
+          .set("numericValue", new BigDecimal("23.4"))
+          .set("bigNumericValue", "23334.4")
+          .set("arrayValue", REPEATED_BYTES);
+
+  private static final TableRow BASE_TABLE_ROW_NUM_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", 43)
+          .set("timeValue", 3497124416L)
+          .set("datetimeValue", 142111881387172416L)
+          .set("dateValue", 18124)
+          .set("numericValue", new BigDecimal("23.4"))
+          .set("bigNumericValue", "23334.4")
+          .set("arrayValue", REPEATED_BYTES);
+
+  private static final TableRow BASE_TABLE_ROW_FLOATS =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", 43)
+          .set("timeValue", 3497124416L)
+          .set("datetimeValue", 142111881387172416D)
+          .set("dateValue", 18124)
+          .set("numericValue", 23.4)
+          .set("bigNumericValue", "23334.4")
+          .set("arrayValue", REPEATED_BYTES);
+
+  private static final TableRow BASE_TABLE_ROW_NULL =
+      new TableRow()
+          //          .set("stringValue", null)  // do not set stringValue, this should work
+          .set("bytesValue", null)
+          .set("int64Value", null)
+          .set("intValue", null)
+          .set("float64Value", null)
+          .set("floatValue", null)
+          .set("boolValue", null)
+          .set("booleanValue", null)
+          .set("timestampValue", null)
+          .set("timeValue", null)
+          .set("datetimeValue", null)
+          .set("dateValue", null)
+          .set("numericValue", null)
+          .set("arrayValue", null);
+
+  private static final List<Object> REPEATED_BYTES_EXPECTED =
+      ImmutableList.of(
+          BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)),
+          BaseEncoding.base64().encode("goodbye".getBytes(StandardCharsets.UTF_8)),
+          BaseEncoding.base64().encode("solong".getBytes(StandardCharsets.UTF_8)));
+
+  private static final TableRow BASE_TABLE_ROW_EXPECTED =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", 2.8168)
+          .set("floatValue", 2.817)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", "4.3E-5")
+          .set("timeValue", "00:52:07.123456")
+          .set("datetimeValue", "2019-08-16T00:52:07.123456")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4")
+          .set("bigNumericValue", "23334.4")
+          .set("arrayValue", REPEATED_BYTES_EXPECTED);
+
+  // joda is up to millisecond precision, expect truncation
+  private static final TableRow BASE_TABLE_ROW_JODA_EXPECTED =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", 2.8168)
+          .set("floatValue", 2.817)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", "0.004")
+          .set("timeValue", "00:52:07.123000")
+          .set("datetimeValue", "2019-08-16T00:52:07.123000")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4")
+          .set("bigNumericValue", "23334.4")
+          .set("arrayValue", REPEATED_BYTES_EXPECTED);
+
+  private static final TableRow BASE_TABLE_ROW_NUM_EXPECTED =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", 2.8168)
+          .set("floatValue", 2.817)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", "4.3E-5")
+          .set("timeValue", "00:52:07.123456")
+          .set("datetimeValue", "2019-08-16T00:52:07.123456")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4")
+          .set("bigNumericValue", "23334.4")
+          .set("arrayValue", REPEATED_BYTES_EXPECTED);
+
+  private static final TableRow BASE_TABLE_ROW_FLOATS_EXPECTED =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", 2.8168)
+          .set("floatValue", 2.817)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", "4.3E-5")
+          .set("timeValue", "00:52:07.123456")
+          .set("datetimeValue", "2019-08-16T00:52:07.123456")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4")
+          .set("bigNumericValue", "23334.4")
+          .set("arrayValue", REPEATED_BYTES_EXPECTED);
+
+  // only nonnull values are returned, null in arrayValue should be converted to empty list
+  private static final TableRow BASE_TABLE_ROW_NULL_EXPECTED =
+      new TableRow().set("arrayValue", ImmutableList.of());
+
+  private static final TableSchema NESTED_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(
+                      new TableFieldSchema()
+                          .setType("STRUCT")
+                          .setName("nestedValue1")
+                          .setMode("REQUIRED")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("RECORD")
+                          .setName("nestedValue2")
+                          .setMode("REPEATED")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("RECORD")
+                          .setName("nestedValue3")
+                          .setMode("NULLABLE")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .build());
+
+  @BeforeClass
+  public static void setUpTestEnvironment() throws IOException, InterruptedException {
+    // Create one BQ dataset for all test cases.
+    BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID);
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    LOG.info("Start to clean up tables and datasets.");
+    BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
+  }
+
+  @Test
+  public void testBaseTableRow() throws IOException, InterruptedException {
+    String tableSpec = createTable(BASE_TABLE_SCHEMA);
+
+    runPipeline(tableSpec, Collections.singleton(BASE_TABLE_ROW));
+
+    List<TableRow> actualTableRows =
+        BQ_CLIENT.queryUnflattened(String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true);
+
+    assertEquals(1, actualTableRows.size());
+    assertEquals(BASE_TABLE_ROW_EXPECTED, actualTableRows.get(0));
+  }
+
+  @Test
+  public void testNestedRichTypesAndNull() throws IOException, InterruptedException {
+    String tableSpec = createTable(NESTED_TABLE_SCHEMA);
+    TableRow tableRow =
+        new TableRow()
+            .set("nestedValue1", BASE_TABLE_ROW)
+            .set(
+                "nestedValue2",
+                Arrays.asList(
+                    BASE_TABLE_ROW_JAVA_TIME,
+                    BASE_TABLE_ROW_JODA_TIME,
+                    BASE_TABLE_ROW_NUM_TIME,
+                    BASE_TABLE_ROW_FLOATS,
+                    BASE_TABLE_ROW_NULL))
+            .set("nestedValue3", null);
+
+    runPipeline(tableSpec, Collections.singleton(tableRow));
+
+    List<TableRow> actualTableRows =
+        BQ_CLIENT.queryUnflattened(String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true);
+
+    assertEquals(1, actualTableRows.size());
+    assertEquals(BASE_TABLE_ROW_EXPECTED, actualTableRows.get(0).get("nestedValue1"));
+    LOG.info("ACTUAL " + actualTableRows.get(0).get("nestedValue2"));
+    assertEquals(
+        ImmutableList.of(
+            BASE_TABLE_ROW_EXPECTED,
+            BASE_TABLE_ROW_JODA_EXPECTED,
+            BASE_TABLE_ROW_NUM_EXPECTED,
+            BASE_TABLE_ROW_FLOATS_EXPECTED,
+            BASE_TABLE_ROW_NULL_EXPECTED),
+        actualTableRows.get(0).get("nestedValue2"));
+    assertNull(actualTableRows.get(0).get("nestedValue3"));
+  }
+
+  private static String createTable(TableSchema tableSchema)
+      throws IOException, InterruptedException {
+    String table = "table" + System.nanoTime();
+    BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, table);
+    BQ_CLIENT.createNewTable(
+        PROJECT,
+        BIG_QUERY_DATASET_ID,
+        new Table()
+            .setSchema(tableSchema)
+            .setTableReference(
+                new TableReference()
+                    .setTableId(table)
+                    .setDatasetId(BIG_QUERY_DATASET_ID)
+                    .setProjectId(PROJECT)));
+    return PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + table;
+  }
+
+  private static void runPipeline(String tableSpec, Iterable<TableRow> tableRows) {
+    Pipeline p = Pipeline.create();
+    p.apply("Create test cases", Create.of(tableRows))
+        .apply(
+            "Write using Storage Write API",
+            BigQueryIO.<TableRow>write()
+                .to(tableSpec)
+                .withFormatFunction(SerializableFunctions.identity())
+                .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
+                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
+    p.run().waitUntilFinish();
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 483cddc08c5..459fed3a168 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -24,6 +24,7 @@ import com.google.api.services.bigquery.model.TableCell;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.BigDecimalByteStringEncoder;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.DescriptorProtos.DescriptorProto;
 import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
@@ -32,7 +33,9 @@ import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Descriptors.FieldDescriptor;
 import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -73,9 +76,10 @@ public class TableRowToStorageApiProtoTest {
                   .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
                   .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
                   .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue"))
                   .add(
                       new TableFieldSchema()
-                          .setType("STRING")
+                          .setType("BYTES")
                           .setMode("REPEATED")
                           .setName("arrayValue"))
                   .build());
@@ -97,9 +101,10 @@ public class TableRowToStorageApiProtoTest {
                   .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
                   .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
                   .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .add(new TableFieldSchema().setType("BIGNUMERIC").setName("bigNumericValue"))
                   .add(
                       new TableFieldSchema()
-                          .setType("STRING")
+                          .setType("BYTES")
                           .setMode("REPEATED")
                           .setName("arrayValue"))
                   .build());
@@ -173,42 +178,49 @@ public class TableRowToStorageApiProtoTest {
               FieldDescriptorProto.newBuilder()
                   .setName("timestampvalue")
                   .setNumber(10)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
                   .setName("timevalue")
                   .setNumber(11)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
                   .setName("datetimevalue")
                   .setNumber(12)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
                   .setName("datevalue")
                   .setNumber(13)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_INT32)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
                   .setName("numericvalue")
                   .setNumber(14)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_BYTES)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
-                  .setName("arrayvalue")
+                  .setName("bignumericvalue")
                   .setNumber(15)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_BYTES)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("arrayvalue")
+                  .setNumber(16)
+                  .setType(Type.TYPE_BYTES)
                   .setLabel(Label.LABEL_REPEATED)
                   .build())
           .build();
@@ -275,42 +287,49 @@ public class TableRowToStorageApiProtoTest {
               FieldDescriptorProto.newBuilder()
                   .setName("timestampvalue")
                   .setNumber(9)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
                   .setName("timevalue")
                   .setNumber(10)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
                   .setName("datetimevalue")
                   .setNumber(11)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
                   .setName("datevalue")
                   .setNumber(2)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_INT32)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
                   .setName("numericvalue")
                   .setNumber(13)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_BYTES)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
           .addField(
               FieldDescriptorProto.newBuilder()
-                  .setName("arrayvalue")
+                  .setName("bignumericvalue")
                   .setNumber(14)
-                  .setType(Type.TYPE_STRING)
+                  .setType(Type.TYPE_BYTES)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("arrayvalue")
+                  .setNumber(15)
+                  .setType(Type.TYPE_BYTES)
                   .setLabel(Label.LABEL_REPEATED)
                   .build())
           .build();
@@ -414,6 +433,18 @@ public class TableRowToStorageApiProtoTest {
     assertEquals(expectedBaseTypesNoF, nestedTypesNoF2);
   }
 
+  private static final List<Object> REPEATED_BYTES =
+      ImmutableList.of(
+          BaseEncoding.base64().encode("hello".getBytes(StandardCharsets.UTF_8)),
+          "goodbye".getBytes(StandardCharsets.UTF_8),
+          ByteString.copyFrom("solong".getBytes(StandardCharsets.UTF_8)));
+
+  private static final List<Object> EXPECTED_PROTO_REPEATED_BYTES =
+      ImmutableList.of(
+          ByteString.copyFrom("hello".getBytes(StandardCharsets.UTF_8)),
+          ByteString.copyFrom("goodbye".getBytes(StandardCharsets.UTF_8)),
+          ByteString.copyFrom("solong".getBytes(StandardCharsets.UTF_8)));
+
   private static final TableRow BASE_TABLE_ROW =
       new TableRow()
           .setF(
@@ -429,12 +460,13 @@ public class TableRowToStorageApiProtoTest {
                   new TableCell().setV("2.817"),
                   new TableCell().setV("true"),
                   new TableCell().setV("true"),
-                  new TableCell().setV("43"),
-                  new TableCell().setV("00:52:07[.123]|[.123456] UTC"),
-                  new TableCell().setV("2019-08-16 00:52:07[.123]|[.123456] UTC"),
+                  new TableCell().setV("1970-01-01T00:00:00.000043Z"),
+                  new TableCell().setV("00:52:07.123456"),
+                  new TableCell().setV("2019-08-16T00:52:07.123456"),
                   new TableCell().setV("2019-08-16"),
                   new TableCell().setV("23.4"),
-                  new TableCell().setV(ImmutableList.of("hello", "goodbye"))));
+                  new TableCell().setV("2312345.4"),
+                  new TableCell().setV(REPEATED_BYTES)));
 
   private static final TableRow BASE_TABLE_ROW_NO_F =
       new TableRow()
@@ -447,12 +479,13 @@ public class TableRowToStorageApiProtoTest {
           .set("floatValue", "2.817")
           .set("boolValue", "true")
           .set("booleanValue", "true")
-          .set("timestampValue", "43")
-          .set("timeValue", "00:52:07[.123]|[.123456] UTC")
-          .set("datetimeValue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
+          .set("timestampValue", "1970-01-01T00:00:00.000043Z")
+          .set("timeValue", "00:52:07.123456")
+          .set("datetimeValue", "2019-08-16T00:52:07.123456")
           .set("dateValue", "2019-08-16")
           .set("numericValue", "23.4")
-          .set("arrayValue", ImmutableList.of("hello", "goodbye"));
+          .set("bigNumericValue", "2312345.4")
+          .set("arrayValue", REPEATED_BYTES);
 
   private static final Map<String, Object> BASE_ROW_EXPECTED_PROTO_VALUES =
       ImmutableMap.<String, Object>builder()
@@ -465,12 +498,17 @@ public class TableRowToStorageApiProtoTest {
           .put("floatvalue", (double) 2.817)
           .put("boolvalue", true)
           .put("booleanvalue", true)
-          .put("timestampvalue", "43")
-          .put("timevalue", "00:52:07[.123]|[.123456] UTC")
-          .put("datetimevalue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
-          .put("datevalue", "2019-08-16")
-          .put("numericvalue", "23.4")
-          .put("arrayvalue", ImmutableList.of("hello", "goodbye"))
+          .put("timestampvalue", 43L)
+          .put("timevalue", 3497124416L)
+          .put("datetimevalue", 142111881387172416L)
+          .put("datevalue", (int) LocalDate.of(2019, 8, 16).toEpochDay())
+          .put(
+              "numericvalue",
+              BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("23.4")))
+          .put(
+              "bignumericvalue",
+              BigDecimalByteStringEncoder.encodeToBigNumericByteString(new BigDecimal("2312345.4")))
+          .put("arrayvalue", EXPECTED_PROTO_REPEATED_BYTES)
           .build();
 
   private static final Map<String, Object> BASE_ROW_NO_F_EXPECTED_PROTO_VALUES =
@@ -483,12 +521,17 @@ public class TableRowToStorageApiProtoTest {
           .put("floatvalue", (double) 2.817)
           .put("boolvalue", true)
           .put("booleanvalue", true)
-          .put("timestampvalue", "43")
-          .put("timevalue", "00:52:07[.123]|[.123456] UTC")
-          .put("datetimevalue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
-          .put("datevalue", "2019-08-16")
-          .put("numericvalue", "23.4")
-          .put("arrayvalue", ImmutableList.of("hello", "goodbye"))
+          .put("timestampvalue", 43L)
+          .put("timevalue", 3497124416L)
+          .put("datetimevalue", 142111881387172416L)
+          .put("datevalue", (int) LocalDate.parse("2019-08-16").toEpochDay())
+          .put(
+              "numericvalue",
+              BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("23.4")))
+          .put(
+              "bignumericvalue",
+              BigDecimalByteStringEncoder.encodeToBigNumericByteString(new BigDecimal("2312345.4")))
+          .put("arrayvalue", EXPECTED_PROTO_REPEATED_BYTES)
           .build();
 
   private void assertBaseRecord(DynamicMessage msg, boolean withF) {
@@ -506,12 +549,16 @@ public class TableRowToStorageApiProtoTest {
         new TableRow()
             .set("nestedValue1", BASE_TABLE_ROW)
             .set("nestedValue2", BASE_TABLE_ROW)
-            .set("nestedvalueNoF1", BASE_TABLE_ROW_NO_F)
-            .set("nestedvalueNoF2", BASE_TABLE_ROW_NO_F);
+            .set("nestedValueNoF1", BASE_TABLE_ROW_NO_F)
+            .set("nestedValueNoF2", BASE_TABLE_ROW_NO_F);
 
     Descriptor descriptor =
         TableRowToStorageApiProto.getDescriptorFromTableSchema(NESTED_TABLE_SCHEMA);
-    DynamicMessage msg = TableRowToStorageApiProto.messageFromTableRow(descriptor, tableRow, false);
+    TableRowToStorageApiProto.SchemaInformation schemaInformation =
+        TableRowToStorageApiProto.SchemaInformation.fromTableSchema(NESTED_TABLE_SCHEMA);
+    DynamicMessage msg =
+        TableRowToStorageApiProto.messageFromTableRow(
+            schemaInformation, descriptor, tableRow, false);
     assertEquals(4, msg.getAllFields().size());
 
     Map<String, FieldDescriptor> fieldDescriptors =
@@ -527,8 +574,11 @@ public class TableRowToStorageApiProtoTest {
   public void testMessageWithFFromTableRow() throws Exception {
     Descriptor descriptor =
         TableRowToStorageApiProto.getDescriptorFromTableSchema(BASE_TABLE_SCHEMA);
+    TableRowToStorageApiProto.SchemaInformation schemaInformation =
+        TableRowToStorageApiProto.SchemaInformation.fromTableSchema(BASE_TABLE_SCHEMA);
     DynamicMessage msg =
-        TableRowToStorageApiProto.messageFromTableRow(descriptor, BASE_TABLE_ROW, false);
+        TableRowToStorageApiProto.messageFromTableRow(
+            schemaInformation, descriptor, BASE_TABLE_ROW, false);
     assertBaseRecord(msg, true);
   }
 
@@ -567,8 +617,11 @@ public class TableRowToStorageApiProtoTest {
             .set("repeatednof2", ImmutableList.of(BASE_TABLE_ROW_NO_F, BASE_TABLE_ROW_NO_F));
     Descriptor descriptor =
         TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA);
+    TableRowToStorageApiProto.SchemaInformation schemaInformation =
+        TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA);
     DynamicMessage msg =
-        TableRowToStorageApiProto.messageFromTableRow(descriptor, repeatedRow, false);
+        TableRowToStorageApiProto.messageFromTableRow(
+            schemaInformation, descriptor, repeatedRow, false);
     assertEquals(4, msg.getAllFields().size());
 
     Map<String, FieldDescriptor> fieldDescriptors =
@@ -609,8 +662,11 @@ public class TableRowToStorageApiProtoTest {
             .set("repeatednof2", null);
     Descriptor descriptor =
         TableRowToStorageApiProto.getDescriptorFromTableSchema(REPEATED_MESSAGE_SCHEMA);
+    TableRowToStorageApiProto.SchemaInformation schemaInformation =
+        TableRowToStorageApiProto.SchemaInformation.fromTableSchema(REPEATED_MESSAGE_SCHEMA);
     DynamicMessage msg =
-        TableRowToStorageApiProto.messageFromTableRow(descriptor, repeatedRow, false);
+        TableRowToStorageApiProto.messageFromTableRow(
+            schemaInformation, descriptor, repeatedRow, false);
 
     Map<String, FieldDescriptor> fieldDescriptors =
         descriptor.getFields().stream()
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
index 577a470cdcd..d58ad0b6b53 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java
@@ -69,7 +69,8 @@ public class StructUtilsTest {
     Struct struct = Struct.newBuilder().set("f_int64").to("string_value").build();
     Exception exception =
         assertThrows(ClassCastException.class, () -> StructUtils.structToBeamRow(struct, schema));
-    checkMessage("java.lang.String cannot be cast to java.lang.Long", exception.getMessage());
+    checkMessage("java.lang.String cannot be cast to", exception.getMessage());
+    checkMessage("java.lang.Long", exception.getMessage());
   }
 
   @Test