You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/28 23:31:14 UTC

[GitHub] [beam] yirutang commented on a diff in pull request #17404: [BEAM-13990] support date and timestamp fields

yirutang commented on code in PR #17404:
URL: https://github.com/apache/beam/pull/17404#discussion_r861374699


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -84,10 +143,10 @@ public static class SchemaDoesntMatchException extends SchemaConversionException
           .put("NUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding.
           .put("BIGNUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding.
           .put("GEOGRAPHY", Type.TYPE_STRING) // Pass through the JSON encoding.
-          .put("DATE", Type.TYPE_STRING) // Pass through the JSON encoding.
+          .put("DATE", Type.TYPE_INT32)
           .put("TIME", Type.TYPE_STRING) // Pass through the JSON encoding.
           .put("DATETIME", Type.TYPE_STRING) // Pass through the JSON encoding.

Review Comment:
   TIME and DATETIME can be Int64 so that the encoding size is smaller. Shouldn't block this PR:
   https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java#L334
   



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class);
+  private static final BigqueryClient BQ_CLIENT = new BigqueryClient("TableRowToStorageApiProtoIT");
+  private static final String PROJECT =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  private static final String BIG_QUERY_DATASET_ID =
+      "table_row_to_storage_api_proto_" + System.nanoTime();
+
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("STRING")
+                          .setMode("REPEATED")
+                          .setName("arrayValue"))
+                  .build());
+
+  private static final TableRow BASE_TABLE_ROW =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", "2.8168")
+          .set("floatValue", "2.817")
+          .set("boolValue", "true")
+          .set("booleanValue", "true")
+          .set("timestampValue", "1970-01-01T00:00:00.000043Z")
+          .set("timeValue", "00:52:07.123456")
+          .set("datetimeValue", "2019-08-16T00:52:07.123456")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4")
+          .set(
+              "arrayValue",
+              Arrays.asList("hello", "goodbye", null)); // null in arrayValue should be removed
+
+  private static final TableRow BASE_TABLE_ROW_JODA_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", org.joda.time.Instant.parse("1970-01-01T00:00:00.000043Z"))
+          .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456"))
+          .set("datetimeValue", org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+          .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16"))
+          .set("numericValue", new BigDecimal("23.4"))
+          .set("arrayValue", ImmutableList.of("hello", "goodbye"));
+
+  private static final TableRow BASE_TABLE_ROW_JAVA_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z"))
+          .set("timeValue", LocalTime.parse("00:52:07.123456"))
+          .set("datetimeValue", LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+          .set("dateValue", LocalDate.parse("2019-08-16"))
+          .set("numericValue", new BigDecimal("23.4"))

Review Comment:
   int and double timestamp value test?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoIT {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class);
+  private static final BigqueryClient BQ_CLIENT = new BigqueryClient("TableRowToStorageApiProtoIT");
+  private static final String PROJECT =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  private static final String BIG_QUERY_DATASET_ID =
+      "table_row_to_storage_api_proto_" + System.nanoTime();
+
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("STRING")
+                          .setMode("REPEATED")
+                          .setName("arrayValue"))
+                  .build());
+
+  private static final TableRow BASE_TABLE_ROW =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", "2.8168")
+          .set("floatValue", "2.817")
+          .set("boolValue", "true")
+          .set("booleanValue", "true")
+          .set("timestampValue", "1970-01-01T00:00:00.000043Z")
+          .set("timeValue", "00:52:07.123456")
+          .set("datetimeValue", "2019-08-16T00:52:07.123456")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4")
+          .set(
+              "arrayValue",
+              Arrays.asList("hello", "goodbye", null)); // null in arrayValue should be removed
+
+  private static final TableRow BASE_TABLE_ROW_JODA_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", org.joda.time.Instant.parse("1970-01-01T00:00:00.000043Z"))
+          .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456"))
+          .set("datetimeValue", org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+          .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16"))
+          .set("numericValue", new BigDecimal("23.4"))
+          .set("arrayValue", ImmutableList.of("hello", "goodbye"));
+
+  private static final TableRow BASE_TABLE_ROW_JAVA_TIME =
+      new TableRow()
+          .set("stringValue", "string")
+          .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+          .set("int64Value", 42)
+          .set("intValue", 43)
+          .set("float64Value", 2.8168f)
+          .set("floatValue", 2.817f)
+          .set("boolValue", true)
+          .set("booleanValue", true)
+          .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z"))
+          .set("timeValue", LocalTime.parse("00:52:07.123456"))
+          .set("datetimeValue", LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+          .set("dateValue", LocalDate.parse("2019-08-16"))
+          .set("numericValue", new BigDecimal("23.4"))

Review Comment:
   bignumeric test?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -523,14 +523,6 @@ public void testTimePartitioning() throws Exception {
     testTimePartitioning(method);
   }
 
-  @Test
-  public void testTimePartitioningStorageApi() throws Exception {

Review Comment:
   Why remove this?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -218,125 +288,157 @@ private static void fieldDescriptorFromTableField(
   }
 
   @Nullable
+  @SuppressWarnings({"nullness"})
   private static Object messageValueFromFieldValue(
-      FieldDescriptor fieldDescriptor, @Nullable Object bqValue, boolean ignoreUnknownValues)
+      SchemaInformation schemaInformation,
+      FieldDescriptor fieldDescriptor,
+      @Nullable Object bqValue,
+      boolean ignoreUnknownValues)
       throws SchemaConversionException {
     if (bqValue == null) {
       if (fieldDescriptor.isOptional()) {
         return null;
       } else if (fieldDescriptor.isRepeated()) {
         return Collections.emptyList();
-      }
-      {
+      } else {
         throw new IllegalArgumentException(
             "Received null value for non-nullable field " + fieldDescriptor.getName());
       }
     }
-    return toProtoValue(
-        fieldDescriptor, bqValue, fieldDescriptor.isRepeated(), ignoreUnknownValues);
-  }
-
-  private static final Map<FieldDescriptor.Type, Function<String, Object>>
-      JSON_PROTO_STRING_PARSERS =
-          ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
-              .put(FieldDescriptor.Type.INT32, Integer::valueOf)
-              .put(FieldDescriptor.Type.INT64, Long::valueOf)
-              .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
-              .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
-              .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
-              .put(FieldDescriptor.Type.STRING, str -> str)
-              .put(
-                  FieldDescriptor.Type.BYTES,
-                  b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
-              .build();
 
-  @Nullable
-  @SuppressWarnings({"nullness"})
-  @VisibleForTesting
-  static Object toProtoValue(
-      FieldDescriptor fieldDescriptor,
-      Object jsonBQValue,
-      boolean isRepeated,
-      boolean ignoreUnknownValues)
-      throws SchemaConversionException {
-    if (isRepeated) {
-      List<Object> listValue = (List<Object>) jsonBQValue;
+    if (fieldDescriptor.isRepeated()) {
+      List<Object> listValue = (List<Object>) bqValue;
       List<Object> protoList = Lists.newArrayListWithCapacity(listValue.size());
-      for (Object o : listValue) {
-        protoList.add(toProtoValue(fieldDescriptor, o, false, ignoreUnknownValues));
+      for (@Nullable Object o : listValue) {
+        if (o != null) { // repeated field cannot contain null.
+          protoList.add(
+              singularFieldToProtoValue(
+                  schemaInformation, fieldDescriptor, o, ignoreUnknownValues));
+        }
       }
       return protoList;
     }
-
-    if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
-      if (jsonBQValue instanceof TableRow) {
-        TableRow tableRow = (TableRow) jsonBQValue;
-        return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow, ignoreUnknownValues);
-      } else if (jsonBQValue instanceof AbstractMap) {
-        // This will handle nested rows.
-        AbstractMap<String, Object> map = ((AbstractMap<String, Object>) jsonBQValue);
-        return messageFromMap(fieldDescriptor.getMessageType(), map, ignoreUnknownValues);
-      } else {
-        throw new RuntimeException("Unexpected value " + jsonBQValue + " Expected a JSON map.");
-      }
-    }
-    @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor, jsonBQValue);
-    if (scalarValue == null) {
-      return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated, ignoreUnknownValues);
-    } else {
-      return scalarValue;
-    }
+    return singularFieldToProtoValue(
+        schemaInformation, fieldDescriptor, bqValue, ignoreUnknownValues);
   }
 
   @VisibleForTesting
   @Nullable
-  static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object jsonBQValue) {
-    if (jsonBQValue instanceof String) {
-      Function<String, Object> mapper = JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
-      if (mapper == null) {
-        throw new UnsupportedOperationException(
-            "Converting BigQuery type '"
-                + jsonBQValue.getClass()
-                + "' to '"
-                + fieldDescriptor
-                + "' is not supported");
-      }
-      return mapper.apply((String) jsonBQValue);
-    }
-
-    switch (fieldDescriptor.getType()) {
-      case BOOL:
-        if (jsonBQValue instanceof Boolean) {
-          return jsonBQValue;
+  static Object singularFieldToProtoValue(
+      SchemaInformation schemaInformation,
+      FieldDescriptor fieldDescriptor,
+      Object value,
+      boolean ignoreUnknownValues)
+      throws SchemaConversionException {
+    switch (schemaInformation.getType()) {
+      case "INT64":
+      case "INTEGER":
+        if (value instanceof String) {
+          return Long.valueOf((String) value);
+        } else if (value instanceof Integer || value instanceof Long) {
+          return ((Number) value).longValue();
         }
         break;
-      case BYTES:
+      case "FLOAT64":
+      case "FLOAT":
+        if (value instanceof String) {
+          return Double.valueOf((String) value);
+        } else if (value instanceof Double || value instanceof Float) {
+          return ((Number) value).doubleValue();
+        }
         break;
-      case INT64:
-        if (jsonBQValue instanceof Integer) {
-          return Long.valueOf((Integer) jsonBQValue);
-        } else if (jsonBQValue instanceof Long) {
-          return jsonBQValue;
+      case "BOOLEAN":
+      case "BOOL":
+        if (value instanceof String) {
+          return Boolean.valueOf((String) value);
+        } else if (value instanceof Boolean) {
+          return value;
         }
         break;
-      case INT32:
-        if (jsonBQValue instanceof Integer) {
-          return jsonBQValue;
+      case "BYTES":
+        if (value instanceof String) {
+          return ByteString.copyFrom(BaseEncoding.base64().decode((String) value));
+        } else if (value instanceof byte[]) {
+          return ByteString.copyFrom((byte[]) value);
+        } else if (value instanceof ByteString) {
+          return value;
         }
         break;
-      case STRING:
+      case "TIMESTAMP":
+        if (value instanceof String) {
+          try {
+            return ChronoUnit.MICROS.between(Instant.EPOCH, Instant.parse((String) value));
+          } catch (DateTimeParseException e) {
+            return ChronoUnit.MICROS.between(
+                Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) value)));
+          }
+        } else if (value instanceof Instant) {
+          return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value);
+        } else if (value instanceof org.joda.time.Instant) {
+          // joda instant precision is millisecond
+          return ((org.joda.time.Instant) value).getMillis() * 1000L;
+        } else if (value instanceof Integer || value instanceof Long) {
+          return ((Number) value).longValue();
+        } else if (value instanceof Double || value instanceof Float) {
+          // assume value represents number of seconds since epoch
+          return BigDecimal.valueOf(((Number) value).doubleValue())
+              .scaleByPowerOfTen(6)
+              .setScale(0, RoundingMode.HALF_UP)
+              .longValue();
+        }
         break;
-      case DOUBLE:
-        if (jsonBQValue instanceof Double) {
-          return jsonBQValue;
-        } else if (jsonBQValue instanceof Float) {
-          return Double.valueOf((Float) jsonBQValue);
+      case "DATE":
+        if (value instanceof String) {
+          return ((Long) LocalDate.parse((String) value).toEpochDay()).intValue();
+        } else if (value instanceof LocalDate) {
+          return ((Long) ((LocalDate) value).toEpochDay()).intValue();
+        } else if (value instanceof org.joda.time.LocalDate) {
+          return Days.daysBetween(
+                  org.joda.time.Instant.EPOCH.toDateTime().toLocalDate(),
+                  (org.joda.time.LocalDate) value)
+              .getDays();
+        }
+        break;
+      case "NUMERIC":
+      case "BIGNUMERIC":
+        if (value instanceof String) {
+          return value;
+        } else if (value instanceof BigDecimal) {
+          return ((BigDecimal) value).toPlainString();

Review Comment:
   We do provide encoding here:
   https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigDecimalByteStringEncoder.java
   
   So that they can be pass in more efficiently. Not blocking this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org