You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/03 04:08:50 UTC

[GitHub] [beam] reuvenlax opened a new pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

reuvenlax opened a new pull request #14136:
URL: https://github.com/apache/beam/pull/14136


   R: @yirutang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587896666



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -140,7 +110,7 @@ public static Builder builder() {
   public abstract static class SchemaConversionOptions implements Serializable {
 
     /**
-     * Controls whether to use the map or row FieldType for a TableSchema field that appears to
+     * /** Controls whether to use the map or row FieldType for a TableSchema field that appears to

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587928961



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+
+/**
+ * Utility methods for converting JSON {@link TableRow} objects to dynamic protocol message, for use
+ * with the Storage write API.
+ */
+public class TableRowToStorageApiProto {

Review comment:
       No, since users of TableRow may predate the existence of Beam Rows.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
yirutang commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r588097129



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("STRING")
+                          .setMode("REPEATED")
+                          .setName("arrayValue"))
+                  .build());
+
+  private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+      DescriptorProto.newBuilder()
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("stringvalue")
+                  .setNumber(1)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("bytesvalue")
+                  .setNumber(2)
+                  .setType(Type.TYPE_BYTES)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("int64value")
+                  .setNumber(3)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("intvalue")
+                  .setNumber(4)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("float64value")
+                  .setNumber(5)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("floatvalue")
+                  .setNumber(6)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("boolvalue")
+                  .setNumber(7)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("booleanvalue")
+                  .setNumber(8)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvalue")
+                  .setNumber(9)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timevalue")
+                  .setNumber(10)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datetimevalue")
+                  .setNumber(11)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datevalue")
+                  .setNumber(12)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("numericvalue")
+                  .setNumber(13)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("arrayvalue")
+                  .setNumber(14)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_REPEATED)
+                  .build())
+          .build();
+
+  private static final TableSchema NESTED_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(
+                      new TableFieldSchema()
+                          .setType("STRUCT")
+                          .setName("nestedValue1")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("RECORD")
+                          .setName("nestedValue2")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .build());
+
+  // For now, test that no exceptions are thrown.
+  @Test
+  public void testDescriptorFromTableSchema() {
+    DescriptorProto descriptor =
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+    Map<String, Type> types =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    Map<String, Type> expectedTypes =
+        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedTypes, types);
+  }
+
+  @Test
+  public void testNestedFromTableSchema() {
+    DescriptorProto descriptor =
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA);
+    Map<String, Type> expectedBaseTypes =
+        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+
+    Map<String, Type> types =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    Map<String, String> typeNames =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getTypeName));
+    assertEquals(2, types.size());
+
+    Map<String, DescriptorProto> nestedTypes =
+        descriptor.getNestedTypeList().stream()
+            .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity()));
+    assertEquals(2, nestedTypes.size());
+    assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue1"));
+    String nestedTypeName1 = typeNames.get("nestedvalue1");
+    Map<String, Type> nestedTypes1 =
+        nestedTypes.get(nestedTypeName1).getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedBaseTypes, nestedTypes1);
+
+    assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue2"));
+    String nestedTypeName2 = typeNames.get("nestedvalue2");
+    Map<String, Type> nestedTypes2 =
+        nestedTypes.get(nestedTypeName2).getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedBaseTypes, nestedTypes2);
+  }
+
+  @Test
+  public void testRepeatedDescriptorFromTableSchema() {
+    TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);

Review comment:
       extra test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587893135



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, Type> LOGICAL_TYPES =
+      ImmutableMap.<String, Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+          .build();
+
+  static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<TypeName, Function<Object, Object>>builder()
+          .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
+          .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
+          .put(TypeName.INT32, Functions.identity())
+          .put(TypeName.INT64, Functions.identity())
+          .put(TypeName.FLOAT, Function.identity())
+          .put(TypeName.DOUBLE, Function.identity())
+          .put(TypeName.STRING, Function.identity())
+          .put(TypeName.BOOLEAN, Function.identity())
+          .put(TypeName.DATETIME, o -> ((ReadableInstant) o).getMillis() * 1000)

Review comment:
       DATETIME in Beam is improperly named - it's actually a timestamp, not a DATETIME




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587893374



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");

Review comment:
       Probably could - I just copy/pasted this from zetasql files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
yirutang commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587717068



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, Type> LOGICAL_TYPES =
+      ImmutableMap.<String, Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+          .build();
+
+  static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<TypeName, Function<Object, Object>>builder()
+          .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
+          .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
+          .put(TypeName.INT32, Functions.identity())
+          .put(TypeName.INT64, Functions.identity())
+          .put(TypeName.FLOAT, Function.identity())
+          .put(TypeName.DOUBLE, Function.identity())
+          .put(TypeName.STRING, Function.identity())
+          .put(TypeName.BOOLEAN, Function.identity())
+          .put(TypeName.DATETIME, o -> ((ReadableInstant) o).getMillis() * 1000)
+          .put(TypeName.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .put(
+              TypeName.DECIMAL,
+              o ->
+                  serializeBigDecimal(
+                      (BigDecimal) o,
+                      NUMERIC_SCALE,
+                      MAX_NUMERIC_VALUE,
+                      MIN_NUMERIC_VALUE,
+                      "Numeric"))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType<?, ?>, Object, Object>> LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType<?, ?>, Object, Object>>builder()
+          .put(
+              SqlTypes.DATE.getIdentifier(),
+              (logicalType, value) -> (int) ((LocalDate) value).toEpochDay())
+          .put(
+              SqlTypes.TIME.getIdentifier(),
+              (logicalType, value) -> CivilTimeEncoder.encodePacked64TimeMicros((LocalTime) value))
+          .put(
+              SqlTypes.DATETIME.getIdentifier(),
+              (logicalType, value) ->
+                  CivilTimeEncoder.encodePacked64DatetimeSeconds((LocalDateTime) value))
+          .put(
+              SqlTypes.TIMESTAMP.getIdentifier(),
+              (logicalType, value) -> ((Instant) value).getMillis() * 1000)
+          .put(
+              EnumerationType.IDENTIFIER,
+              (logicalType, value) ->
+                  ((EnumerationType) logicalType).toString((EnumerationType.Value) value))
+          .build();
+
+  /**
+   * Given a Beam Schema, returns a protocol-buffer Descriptor that can be used to write data using
+   * the BigQuery Storage API.
+   */
+  public static Descriptor getDescriptorFromSchema(Schema schema)
+      throws DescriptorValidationException {
+    DescriptorProto descriptorProto = descriptorSchemaFromBeamSchema(schema);
+    FileDescriptorProto fileDescriptorProto =
+        FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
+    FileDescriptor fileDescriptor =
+        FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]);
+
+    return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
+  }
+
+  /**
+   * Given a Beam {@link Row} object, returns a protocol-buffer message that can be used to write
+   * data using the BigQuery Storage streaming API.
+   */
+  public static DynamicMessage messageFromBeamRow(Descriptor descriptor, Row row) {
+    Schema beamSchema = row.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (int i = 0; i < row.getFieldCount(); ++i) {
+      Field beamField = beamSchema.getField(i);
+      FieldDescriptor fieldDescriptor =
+          Preconditions.checkNotNull(descriptor.findFieldByName(beamField.getName().toLowerCase()));
+      @Nullable Object value = messageValueFromRowValue(fieldDescriptor, beamField, i, row);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  @VisibleForTesting
+  static DescriptorProto descriptorSchemaFromBeamSchema(Schema schema) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    // Create a unique name for the descriptor ('-' characters cannot be used).
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
+    int i = 1;
+    List<DescriptorProto> nestedTypes = Lists.newArrayList();
+    for (Field field : schema.getFields()) {

Review comment:
       Is it possible for schema to have 0 fields? If it is possible what does it mean?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -140,7 +110,7 @@ public static Builder builder() {
   public abstract static class SchemaConversionOptions implements Serializable {
 
     /**
-     * Controls whether to use the map or row FieldType for a TableSchema field that appears to
+     * /** Controls whether to use the map or row FieldType for a TableSchema field that appears to

Review comment:
       remove /** ?

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))

Review comment:
       enum?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, Type> LOGICAL_TYPES =
+      ImmutableMap.<String, Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+          .build();
+
+  static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<TypeName, Function<Object, Object>>builder()
+          .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
+          .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
+          .put(TypeName.INT32, Functions.identity())
+          .put(TypeName.INT64, Functions.identity())
+          .put(TypeName.FLOAT, Function.identity())
+          .put(TypeName.DOUBLE, Function.identity())
+          .put(TypeName.STRING, Function.identity())
+          .put(TypeName.BOOLEAN, Function.identity())
+          .put(TypeName.DATETIME, o -> ((ReadableInstant) o).getMillis() * 1000)
+          .put(TypeName.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .put(
+              TypeName.DECIMAL,
+              o ->
+                  serializeBigDecimal(
+                      (BigDecimal) o,
+                      NUMERIC_SCALE,
+                      MAX_NUMERIC_VALUE,
+                      MIN_NUMERIC_VALUE,
+                      "Numeric"))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType<?, ?>, Object, Object>> LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType<?, ?>, Object, Object>>builder()
+          .put(
+              SqlTypes.DATE.getIdentifier(),
+              (logicalType, value) -> (int) ((LocalDate) value).toEpochDay())
+          .put(
+              SqlTypes.TIME.getIdentifier(),
+              (logicalType, value) -> CivilTimeEncoder.encodePacked64TimeMicros((LocalTime) value))
+          .put(
+              SqlTypes.DATETIME.getIdentifier(),
+              (logicalType, value) ->
+                  CivilTimeEncoder.encodePacked64DatetimeSeconds((LocalDateTime) value))
+          .put(
+              SqlTypes.TIMESTAMP.getIdentifier(),
+              (logicalType, value) -> ((Instant) value).getMillis() * 1000)
+          .put(
+              EnumerationType.IDENTIFIER,
+              (logicalType, value) ->
+                  ((EnumerationType) logicalType).toString((EnumerationType.Value) value))
+          .build();
+
+  /**
+   * Given a Beam Schema, returns a protocol-buffer Descriptor that can be used to write data using
+   * the BigQuery Storage API.
+   */
+  public static Descriptor getDescriptorFromSchema(Schema schema)
+      throws DescriptorValidationException {
+    DescriptorProto descriptorProto = descriptorSchemaFromBeamSchema(schema);
+    FileDescriptorProto fileDescriptorProto =
+        FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
+    FileDescriptor fileDescriptor =
+        FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]);
+
+    return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
+  }
+
+  /**
+   * Given a Beam {@link Row} object, returns a protocol-buffer message that can be used to write
+   * data using the BigQuery Storage streaming API.
+   */
+  public static DynamicMessage messageFromBeamRow(Descriptor descriptor, Row row) {
+    Schema beamSchema = row.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (int i = 0; i < row.getFieldCount(); ++i) {
+      Field beamField = beamSchema.getField(i);
+      FieldDescriptor fieldDescriptor =
+          Preconditions.checkNotNull(descriptor.findFieldByName(beamField.getName().toLowerCase()));
+      @Nullable Object value = messageValueFromRowValue(fieldDescriptor, beamField, i, row);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  @VisibleForTesting
+  static DescriptorProto descriptorSchemaFromBeamSchema(Schema schema) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    // Create a unique name for the descriptor ('-' characters cannot be used).
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
+    int i = 1;
+    List<DescriptorProto> nestedTypes = Lists.newArrayList();
+    for (Field field : schema.getFields()) {
+      FieldDescriptorProto.Builder fieldDescriptorProtoBuilder =
+          fieldDescriptorFromBeamField(field, i++, nestedTypes);
+      descriptorBuilder.addField(fieldDescriptorProtoBuilder);
+    }
+    nestedTypes.forEach(descriptorBuilder::addNestedType);
+    return descriptorBuilder.build();
+  }
+
+  private static FieldDescriptorProto.Builder fieldDescriptorFromBeamField(
+      Field field, int fieldNumber, List<DescriptorProto> nestedTypes) {
+    FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setName(field.getName().toLowerCase());
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+
+    switch (field.getType().getTypeName()) {
+      case ROW:
+        @Nullable Schema rowSchema = field.getType().getRowSchema();
+        if (rowSchema == null) {
+          throw new RuntimeException("Unexpected null schema!");
+        }
+        DescriptorProto nested = descriptorSchemaFromBeamSchema(rowSchema);
+        nestedTypes.add(nested);
+        fieldDescriptorBuilder =
+            fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+        break;
+      case ARRAY:
+      case ITERABLE:
+        @Nullable FieldType elementType = field.getType().getCollectionElementType();
+        if (elementType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        Preconditions.checkState(
+            !Preconditions.checkNotNull(elementType.getTypeName()).isCollectionType(),
+            "Nested arrays not supported by BigQuery.");
+        return fieldDescriptorFromBeamField(
+                Field.of(field.getName(), elementType), fieldNumber, nestedTypes)
+            .setLabel(Label.LABEL_REPEATED);
+      case LOGICAL_TYPE:
+        @Nullable LogicalType<?, ?> logicalType = field.getType().getLogicalType();
+        if (logicalType == null) {
+          throw new RuntimeException("Unexpected null logical type " + field.getType());
+        }
+        @Nullable Type type = LOGICAL_TYPES.get(logicalType.getIdentifier());
+        if (type == null) {
+          throw new RuntimeException("Unsupported logical type " + field.getType());
+        }
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(type);
+        break;
+      case MAP:
+        throw new RuntimeException("Map types not supported by BigQuery.");
+      default:
+        @Nullable Type primitiveType = PRIMITIVE_TYPES.get(field.getType().getTypeName());
+        if (primitiveType == null) {
+          throw new RuntimeException("Unsupported type " + field.getType());
+        }
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(primitiveType);
+    }
+    if (field.getType().getNullable()) {
+      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
+    } else {
+      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+    }
+    return fieldDescriptorBuilder;
+  }
+
+  @Nullable
+  private static Object messageValueFromRowValue(
+      FieldDescriptor fieldDescriptor, Field beamField, int index, Row row) {
+    @Nullable Object value = row.getValue(index);
+    if (value == null) {
+      if (fieldDescriptor.isOptional()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field " + fieldDescriptor.getName());
+      }
+    }
+    return toProtoValue(fieldDescriptor, beamField.getType(), value);
+  }
+
+  private static Object toProtoValue(
+      FieldDescriptor fieldDescriptor, FieldType beamFieldType, Object value) {
+    switch (beamFieldType.getTypeName()) {
+      case ROW:
+        return messageFromBeamRow(fieldDescriptor.getMessageType(), (Row) value);
+      case ARRAY:
+        List<Object> list = (List<Object>) value;
+        @Nullable FieldType arrayElementType = beamFieldType.getCollectionElementType();
+        if (arrayElementType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        return list.stream()
+            .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
+            .collect(Collectors.toList());
+      case ITERABLE:
+        Iterable<Object> iterable = (Iterable<Object>) value;
+        @Nullable FieldType iterableElementType = beamFieldType.getCollectionElementType();
+        if (iterableElementType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        return StreamSupport.stream(iterable.spliterator(), false)
+            .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v))
+            .collect(Collectors.toList());
+      case MAP:
+        throw new RuntimeException("Map types not supported by BigQuery.");
+      default:
+        return scalarToProtoValue(beamFieldType, value);
+    }
+  }
+
+  @VisibleForTesting
+  static Object scalarToProtoValue(FieldType beamFieldType, Object value) {
+    if (beamFieldType.getTypeName() == TypeName.LOGICAL_TYPE) {
+      @Nullable LogicalType<?, ?> logicalType = beamFieldType.getLogicalType();
+      if (logicalType == null) {
+        throw new RuntimeException("Unexpectedly null logical type " + beamFieldType);
+      }
+      @Nullable
+      BiFunction<LogicalType<?, ?>, Object, Object> logicalTypeEncoder =
+          LOGICAL_TYPE_ENCODERS.get(logicalType.getIdentifier());
+      if (logicalTypeEncoder == null) {
+        throw new RuntimeException("Unsupported logical type " + logicalType.getIdentifier());
+      }
+      return logicalTypeEncoder.apply(logicalType, value);
+    } else {
+      @Nullable
+      Function<Object, Object> encoder = PRIMITIVE_ENCODERS.get(beamFieldType.getTypeName());
+      if (encoder == null) {
+        throw new RuntimeException("Unexpected beam type " + beamFieldType);
+      }
+      return encoder.apply(value);
+    }
+  }
+
+  private static ByteString serializeBigDecimal(
+      BigDecimal v, int scale, BigDecimal maxValue, BigDecimal minValue, String typeName) {

Review comment:
       Is scale, min and max value always fixed? why not use the constant directly.

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .build());
+
+  private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+      DescriptorProto.newBuilder()
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("stringvalue")
+                  .setNumber(1)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("bytesvalue")
+                  .setNumber(2)
+                  .setType(Type.TYPE_BYTES)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("int64value")
+                  .setNumber(3)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("intvalue")
+                  .setNumber(4)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("float64value")
+                  .setNumber(5)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("floatvalue")
+                  .setNumber(6)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("boolvalue")
+                  .setNumber(7)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("booleanvalue")
+                  .setNumber(8)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvalue")
+                  .setNumber(9)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timevalue")
+                  .setNumber(10)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datetimevalue")
+                  .setNumber(11)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datevalue")
+                  .setNumber(12)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("numericvalue")
+                  .setNumber(13)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .build();
+
+  private static final TableSchema NESTED_TABLE_SCHEMA =

Review comment:
       repeated nested and nested repeated? more than one level of nest?

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .build());
+
+  private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+      DescriptorProto.newBuilder()
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("stringvalue")
+                  .setNumber(1)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("bytesvalue")
+                  .setNumber(2)
+                  .setType(Type.TYPE_BYTES)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("int64value")
+                  .setNumber(3)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("intvalue")
+                  .setNumber(4)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("float64value")
+                  .setNumber(5)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("floatvalue")
+                  .setNumber(6)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("boolvalue")
+                  .setNumber(7)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("booleanvalue")
+                  .setNumber(8)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvalue")
+                  .setNumber(9)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timevalue")
+                  .setNumber(10)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datetimevalue")
+                  .setNumber(11)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datevalue")
+                  .setNumber(12)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("numericvalue")
+                  .setNumber(13)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .build();
+
+  private static final TableSchema NESTED_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(
+                      new TableFieldSchema()
+                          .setType("STRUCT")
+                          .setName("nestedValue1")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("RECORD")
+                          .setName("nestedValue2")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .build());
+
+  // For now, test that no exceptions are thrown.
+  @Test
+  public void testDescriptorFromTableSchema() {
+    DescriptorProto descriptor =
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+    Map<String, Type> types =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    Map<String, Type> expectedTypes =
+        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedTypes, types);
+  }
+
+  @Test
+  public void testNestedFromTableSchema() {
+    DescriptorProto descriptor =
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA);
+    Map<String, Type> expectedBaseTypes =
+        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+
+    Map<String, Type> types =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    Map<String, String> typeNames =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getTypeName));
+    assertEquals(2, types.size());
+
+    Map<String, DescriptorProto> nestedTypes =
+        descriptor.getNestedTypeList().stream()
+            .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity()));
+    assertEquals(2, nestedTypes.size());
+    assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue1"));
+    String nestedTypeName1 = typeNames.get("nestedvalue1");
+    Map<String, Type> nestedTypes1 =
+        nestedTypes.get(nestedTypeName1).getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedBaseTypes, nestedTypes1);
+
+    assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue2"));
+    String nestedTypeName2 = typeNames.get("nestedvalue2");
+    Map<String, Type> nestedTypes2 =
+        nestedTypes.get(nestedTypeName2).getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedBaseTypes, nestedTypes2);
+  }
+
+  @Test
+  public void testRepeatedDescriptorFromTableSchema() {
+    TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+  }
+
+  private static final TableRow BASE_TABLE_ROW =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", "2.8168")
+          .set("floatValue", "2.817")
+          .set("boolValue", "true")
+          .set("booleanValue", "true")
+          .set("timestampValue", "43")
+          .set("timeValue", "00:52:07[.123]|[.123456] UTC")
+          .set("datetimeValue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4");
+
+  private void assertBaseRecord(DynamicMessage msg) {

Review comment:
       Is this just comparing to the code itself? Could we have static expectations?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
yirutang commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587700550



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)

Review comment:
       It this a Primitive type?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, Type> LOGICAL_TYPES =
+      ImmutableMap.<String, Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)

Review comment:
       FYI, we also support enum to int64 conversion mapping.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, Type> LOGICAL_TYPES =
+      ImmutableMap.<String, Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+          .build();
+
+  static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<TypeName, Function<Object, Object>>builder()
+          .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
+          .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
+          .put(TypeName.INT32, Functions.identity())
+          .put(TypeName.INT64, Functions.identity())
+          .put(TypeName.FLOAT, Function.identity())
+          .put(TypeName.DOUBLE, Function.identity())
+          .put(TypeName.STRING, Function.identity())
+          .put(TypeName.BOOLEAN, Function.identity())
+          .put(TypeName.DATETIME, o -> ((ReadableInstant) o).getMillis() * 1000)

Review comment:
       Datetime needs encoding, it shouldn't be a direct micros.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)

Review comment:
       Is this a Primitive type?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");

Review comment:
       Nit, I am wondering if it can be presented like this:
   Min: -9.9999999999999999999999999999999999999E+29
   Max: 9.9999999999999999999999999999999999999E+29
   
   Easier to verify.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587912296



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .build());
+
+  private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+      DescriptorProto.newBuilder()
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("stringvalue")
+                  .setNumber(1)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("bytesvalue")
+                  .setNumber(2)
+                  .setType(Type.TYPE_BYTES)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("int64value")
+                  .setNumber(3)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("intvalue")
+                  .setNumber(4)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("float64value")
+                  .setNumber(5)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("floatvalue")
+                  .setNumber(6)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("boolvalue")
+                  .setNumber(7)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("booleanvalue")
+                  .setNumber(8)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvalue")
+                  .setNumber(9)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timevalue")
+                  .setNumber(10)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datetimevalue")
+                  .setNumber(11)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datevalue")
+                  .setNumber(12)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("numericvalue")
+                  .setNumber(13)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .build();
+
+  private static final TableSchema NESTED_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(
+                      new TableFieldSchema()
+                          .setType("STRUCT")
+                          .setName("nestedValue1")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("RECORD")
+                          .setName("nestedValue2")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .build());
+
+  // For now, test that no exceptions are thrown.
+  @Test
+  public void testDescriptorFromTableSchema() {
+    DescriptorProto descriptor =
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+    Map<String, Type> types =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    Map<String, Type> expectedTypes =
+        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedTypes, types);
+  }
+
+  @Test
+  public void testNestedFromTableSchema() {
+    DescriptorProto descriptor =
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA);
+    Map<String, Type> expectedBaseTypes =
+        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+
+    Map<String, Type> types =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    Map<String, String> typeNames =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getTypeName));
+    assertEquals(2, types.size());
+
+    Map<String, DescriptorProto> nestedTypes =
+        descriptor.getNestedTypeList().stream()
+            .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity()));
+    assertEquals(2, nestedTypes.size());
+    assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue1"));
+    String nestedTypeName1 = typeNames.get("nestedvalue1");
+    Map<String, Type> nestedTypes1 =
+        nestedTypes.get(nestedTypeName1).getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedBaseTypes, nestedTypes1);
+
+    assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue2"));
+    String nestedTypeName2 = typeNames.get("nestedvalue2");
+    Map<String, Type> nestedTypes2 =
+        nestedTypes.get(nestedTypeName2).getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedBaseTypes, nestedTypes2);
+  }
+
+  @Test
+  public void testRepeatedDescriptorFromTableSchema() {
+    TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+  }
+
+  private static final TableRow BASE_TABLE_ROW =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", "2.8168")
+          .set("floatValue", "2.817")
+          .set("boolValue", "true")
+          .set("booleanValue", "true")
+          .set("timestampValue", "43")
+          .set("timeValue", "00:52:07[.123]|[.123456] UTC")
+          .set("datetimeValue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4");
+
+  private void assertBaseRecord(DynamicMessage msg) {

Review comment:
       done. fixed both tests to have static expectations.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax merged pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax merged pull request #14136:
URL: https://github.com/apache/beam/pull/14136


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
yirutang commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587918403



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))

Review comment:
       I see. 

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import com.google.protobuf.Message;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+
+/**
+ * Utility methods for converting JSON {@link TableRow} objects to dynamic protocol message, for use
+ * with the Storage write API.
+ */
+public class TableRowToStorageApiProto {

Review comment:
       High leve, why do we need BeamRow->proto and TableRow->proto? Would BeamRow->proto be enough?

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .build());
+
+  private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+      DescriptorProto.newBuilder()
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("stringvalue")
+                  .setNumber(1)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("bytesvalue")
+                  .setNumber(2)
+                  .setType(Type.TYPE_BYTES)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("int64value")
+                  .setNumber(3)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("intvalue")
+                  .setNumber(4)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("float64value")
+                  .setNumber(5)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("floatvalue")
+                  .setNumber(6)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("boolvalue")
+                  .setNumber(7)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("booleanvalue")
+                  .setNumber(8)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvalue")
+                  .setNumber(9)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timevalue")
+                  .setNumber(10)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datetimevalue")
+                  .setNumber(11)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datevalue")
+                  .setNumber(12)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("numericvalue")
+                  .setNumber(13)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .build();
+
+  private static final TableSchema NESTED_TABLE_SCHEMA =

Review comment:
       It is worth to verify the case of array's element type is nested, as you did in the other test.

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .build());
+
+  private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+      DescriptorProto.newBuilder()
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("stringvalue")
+                  .setNumber(1)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("bytesvalue")
+                  .setNumber(2)
+                  .setType(Type.TYPE_BYTES)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("int64value")
+                  .setNumber(3)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("intvalue")
+                  .setNumber(4)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("float64value")
+                  .setNumber(5)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("floatvalue")
+                  .setNumber(6)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("boolvalue")
+                  .setNumber(7)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("booleanvalue")
+                  .setNumber(8)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvalue")
+                  .setNumber(9)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timevalue")
+                  .setNumber(10)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datetimevalue")
+                  .setNumber(11)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datevalue")
+                  .setNumber(12)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("numericvalue")
+                  .setNumber(13)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .build();
+
+  private static final TableSchema NESTED_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(
+                      new TableFieldSchema()
+                          .setType("STRUCT")
+                          .setName("nestedValue1")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .add(
+                      new TableFieldSchema()
+                          .setType("RECORD")
+                          .setName("nestedValue2")
+                          .setFields(BASE_TABLE_SCHEMA.getFields()))
+                  .build());
+
+  // For now, test that no exceptions are thrown.
+  @Test
+  public void testDescriptorFromTableSchema() {
+    DescriptorProto descriptor =
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+    Map<String, Type> types =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    Map<String, Type> expectedTypes =
+        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedTypes, types);
+  }
+
+  @Test
+  public void testNestedFromTableSchema() {
+    DescriptorProto descriptor =
+        TableRowToStorageApiProto.descriptorSchemaFromTableSchema(NESTED_TABLE_SCHEMA);
+    Map<String, Type> expectedBaseTypes =
+        BASE_TABLE_SCHEMA_PROTO.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+
+    Map<String, Type> types =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    Map<String, String> typeNames =
+        descriptor.getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getTypeName));
+    assertEquals(2, types.size());
+
+    Map<String, DescriptorProto> nestedTypes =
+        descriptor.getNestedTypeList().stream()
+            .collect(Collectors.toMap(DescriptorProto::getName, Functions.identity()));
+    assertEquals(2, nestedTypes.size());
+    assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue1"));
+    String nestedTypeName1 = typeNames.get("nestedvalue1");
+    Map<String, Type> nestedTypes1 =
+        nestedTypes.get(nestedTypeName1).getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedBaseTypes, nestedTypes1);
+
+    assertEquals(Type.TYPE_MESSAGE, types.get("nestedvalue2"));
+    String nestedTypeName2 = typeNames.get("nestedvalue2");
+    Map<String, Type> nestedTypes2 =
+        nestedTypes.get(nestedTypeName2).getFieldList().stream()
+            .collect(
+                Collectors.toMap(FieldDescriptorProto::getName, FieldDescriptorProto::getType));
+    assertEquals(expectedBaseTypes, nestedTypes2);
+  }
+
+  @Test
+  public void testRepeatedDescriptorFromTableSchema() {
+    TableRowToStorageApiProto.descriptorSchemaFromTableSchema(BASE_TABLE_SCHEMA);
+  }
+
+  private static final TableRow BASE_TABLE_ROW =
+      new TableRow()
+          .set("stringValue", "string")
+          .set(
+              "bytesValue", BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+          .set("int64Value", "42")
+          .set("intValue", "43")
+          .set("float64Value", "2.8168")
+          .set("floatValue", "2.817")
+          .set("boolValue", "true")
+          .set("booleanValue", "true")
+          .set("timestampValue", "43")
+          .set("timeValue", "00:52:07[.123]|[.123456] UTC")
+          .set("datetimeValue", "2019-08-16 00:52:07[.123]|[.123456] UTC")
+          .set("dateValue", "2019-08-16")
+          .set("numericValue", "23.4");
+
+  private void assertBaseRecord(DynamicMessage msg) {

Review comment:
       maybe you haven't pushed the change?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, Type> LOGICAL_TYPES =
+      ImmutableMap.<String, Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+          .build();
+
+  static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<TypeName, Function<Object, Object>>builder()
+          .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
+          .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
+          .put(TypeName.INT32, Functions.identity())
+          .put(TypeName.INT64, Functions.identity())
+          .put(TypeName.FLOAT, Function.identity())
+          .put(TypeName.DOUBLE, Function.identity())
+          .put(TypeName.STRING, Function.identity())
+          .put(TypeName.BOOLEAN, Function.identity())
+          .put(TypeName.DATETIME, o -> ((ReadableInstant) o).getMillis() * 1000)

Review comment:
       nvm, just realize it is DATETIME from different types.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587911900



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .build());
+
+  private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+      DescriptorProto.newBuilder()
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("stringvalue")
+                  .setNumber(1)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("bytesvalue")
+                  .setNumber(2)
+                  .setType(Type.TYPE_BYTES)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("int64value")
+                  .setNumber(3)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("intvalue")
+                  .setNumber(4)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("float64value")
+                  .setNumber(5)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("floatvalue")
+                  .setNumber(6)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("boolvalue")
+                  .setNumber(7)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("booleanvalue")
+                  .setNumber(8)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvalue")
+                  .setNumber(9)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timevalue")
+                  .setNumber(10)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datetimevalue")
+                  .setNumber(11)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datevalue")
+                  .setNumber(12)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("numericvalue")
+                  .setNumber(13)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .build();
+
+  private static final TableSchema NESTED_TABLE_SCHEMA =

Review comment:
       I'm not sure we gain any extra test coverage by doing that, since 2 layers of nest is the same code executed as 1 layer of nest.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587895188



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, Type> LOGICAL_TYPES =
+      ImmutableMap.<String, Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+          .build();
+
+  static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<TypeName, Function<Object, Object>>builder()
+          .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
+          .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
+          .put(TypeName.INT32, Functions.identity())
+          .put(TypeName.INT64, Functions.identity())
+          .put(TypeName.FLOAT, Function.identity())
+          .put(TypeName.DOUBLE, Function.identity())
+          .put(TypeName.STRING, Function.identity())
+          .put(TypeName.BOOLEAN, Function.identity())
+          .put(TypeName.DATETIME, o -> ((ReadableInstant) o).getMillis() * 1000)
+          .put(TypeName.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .put(
+              TypeName.DECIMAL,
+              o ->
+                  serializeBigDecimal(
+                      (BigDecimal) o,
+                      NUMERIC_SCALE,
+                      MAX_NUMERIC_VALUE,
+                      MIN_NUMERIC_VALUE,
+                      "Numeric"))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType<?, ?>, Object, Object>> LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType<?, ?>, Object, Object>>builder()
+          .put(
+              SqlTypes.DATE.getIdentifier(),
+              (logicalType, value) -> (int) ((LocalDate) value).toEpochDay())
+          .put(
+              SqlTypes.TIME.getIdentifier(),
+              (logicalType, value) -> CivilTimeEncoder.encodePacked64TimeMicros((LocalTime) value))
+          .put(
+              SqlTypes.DATETIME.getIdentifier(),
+              (logicalType, value) ->
+                  CivilTimeEncoder.encodePacked64DatetimeSeconds((LocalDateTime) value))
+          .put(
+              SqlTypes.TIMESTAMP.getIdentifier(),
+              (logicalType, value) -> ((Instant) value).getMillis() * 1000)
+          .put(
+              EnumerationType.IDENTIFIER,
+              (logicalType, value) ->
+                  ((EnumerationType) logicalType).toString((EnumerationType.Value) value))
+          .build();
+
+  /**
+   * Given a Beam Schema, returns a protocol-buffer Descriptor that can be used to write data using
+   * the BigQuery Storage API.
+   */
+  public static Descriptor getDescriptorFromSchema(Schema schema)
+      throws DescriptorValidationException {
+    DescriptorProto descriptorProto = descriptorSchemaFromBeamSchema(schema);
+    FileDescriptorProto fileDescriptorProto =
+        FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
+    FileDescriptor fileDescriptor =
+        FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]);
+
+    return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
+  }
+
+  /**
+   * Given a Beam {@link Row} object, returns a protocol-buffer message that can be used to write
+   * data using the BigQuery Storage streaming API.
+   */
+  public static DynamicMessage messageFromBeamRow(Descriptor descriptor, Row row) {
+    Schema beamSchema = row.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (int i = 0; i < row.getFieldCount(); ++i) {
+      Field beamField = beamSchema.getField(i);
+      FieldDescriptor fieldDescriptor =
+          Preconditions.checkNotNull(descriptor.findFieldByName(beamField.getName().toLowerCase()));
+      @Nullable Object value = messageValueFromRowValue(fieldDescriptor, beamField, i, row);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  @VisibleForTesting
+  static DescriptorProto descriptorSchemaFromBeamSchema(Schema schema) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    // Create a unique name for the descriptor ('-' characters cannot be used).
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
+    int i = 1;
+    List<DescriptorProto> nestedTypes = Lists.newArrayList();
+    for (Field field : schema.getFields()) {

Review comment:
       theoretically possible, though it would be weird. I added a check




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587892801



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)

Review comment:
       Yes - in Beam schemas it is

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)

Review comment:
       Yes - in Beam schemas it is




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587892666



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, Type> LOGICAL_TYPES =
+      ImmutableMap.<String, Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)

Review comment:
       Assume it will depend on the table schema though, right? I think existing users are using string fields for enum, so I tried to match that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587910909



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))

Review comment:
       I don't think GoogleSQL has an enum type, does it? Maybe you're thinking of the BeamRow class, and in that test I do test enums.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587896071



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, Type> LOGICAL_TYPES =
+      ImmutableMap.<String, Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+          .build();
+
+  static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<TypeName, Function<Object, Object>>builder()
+          .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
+          .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
+          .put(TypeName.INT32, Functions.identity())
+          .put(TypeName.INT64, Functions.identity())
+          .put(TypeName.FLOAT, Function.identity())
+          .put(TypeName.DOUBLE, Function.identity())
+          .put(TypeName.STRING, Function.identity())
+          .put(TypeName.BOOLEAN, Function.identity())
+          .put(TypeName.DATETIME, o -> ((ReadableInstant) o).getMillis() * 1000)
+          .put(TypeName.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .put(
+              TypeName.DECIMAL,
+              o ->
+                  serializeBigDecimal(
+                      (BigDecimal) o,
+                      NUMERIC_SCALE,
+                      MAX_NUMERIC_VALUE,
+                      MIN_NUMERIC_VALUE,
+                      "Numeric"))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType<?, ?>, Object, Object>> LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType<?, ?>, Object, Object>>builder()
+          .put(
+              SqlTypes.DATE.getIdentifier(),
+              (logicalType, value) -> (int) ((LocalDate) value).toEpochDay())
+          .put(
+              SqlTypes.TIME.getIdentifier(),
+              (logicalType, value) -> CivilTimeEncoder.encodePacked64TimeMicros((LocalTime) value))
+          .put(
+              SqlTypes.DATETIME.getIdentifier(),
+              (logicalType, value) ->
+                  CivilTimeEncoder.encodePacked64DatetimeSeconds((LocalDateTime) value))
+          .put(
+              SqlTypes.TIMESTAMP.getIdentifier(),
+              (logicalType, value) -> ((Instant) value).getMillis() * 1000)
+          .put(
+              EnumerationType.IDENTIFIER,
+              (logicalType, value) ->
+                  ((EnumerationType) logicalType).toString((EnumerationType.Value) value))
+          .build();
+
+  /**
+   * Given a Beam Schema, returns a protocol-buffer Descriptor that can be used to write data using
+   * the BigQuery Storage API.
+   */
+  public static Descriptor getDescriptorFromSchema(Schema schema)
+      throws DescriptorValidationException {
+    DescriptorProto descriptorProto = descriptorSchemaFromBeamSchema(schema);
+    FileDescriptorProto fileDescriptorProto =
+        FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
+    FileDescriptor fileDescriptor =
+        FileDescriptor.buildFrom(fileDescriptorProto, new FileDescriptor[0]);
+
+    return Iterables.getOnlyElement(fileDescriptor.getMessageTypes());
+  }
+
+  /**
+   * Given a Beam {@link Row} object, returns a protocol-buffer message that can be used to write
+   * data using the BigQuery Storage streaming API.
+   */
+  public static DynamicMessage messageFromBeamRow(Descriptor descriptor, Row row) {
+    Schema beamSchema = row.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (int i = 0; i < row.getFieldCount(); ++i) {
+      Field beamField = beamSchema.getField(i);
+      FieldDescriptor fieldDescriptor =
+          Preconditions.checkNotNull(descriptor.findFieldByName(beamField.getName().toLowerCase()));
+      @Nullable Object value = messageValueFromRowValue(fieldDescriptor, beamField, i, row);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  @VisibleForTesting
+  static DescriptorProto descriptorSchemaFromBeamSchema(Schema schema) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    // Create a unique name for the descriptor ('-' characters cannot be used).
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
+    int i = 1;
+    List<DescriptorProto> nestedTypes = Lists.newArrayList();
+    for (Field field : schema.getFields()) {
+      FieldDescriptorProto.Builder fieldDescriptorProtoBuilder =
+          fieldDescriptorFromBeamField(field, i++, nestedTypes);
+      descriptorBuilder.addField(fieldDescriptorProtoBuilder);
+    }
+    nestedTypes.forEach(descriptorBuilder::addNestedType);
+    return descriptorBuilder.build();
+  }
+
+  private static FieldDescriptorProto.Builder fieldDescriptorFromBeamField(
+      Field field, int fieldNumber, List<DescriptorProto> nestedTypes) {
+    FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setName(field.getName().toLowerCase());
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+
+    switch (field.getType().getTypeName()) {
+      case ROW:
+        @Nullable Schema rowSchema = field.getType().getRowSchema();
+        if (rowSchema == null) {
+          throw new RuntimeException("Unexpected null schema!");
+        }
+        DescriptorProto nested = descriptorSchemaFromBeamSchema(rowSchema);
+        nestedTypes.add(nested);
+        fieldDescriptorBuilder =
+            fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+        break;
+      case ARRAY:
+      case ITERABLE:
+        @Nullable FieldType elementType = field.getType().getCollectionElementType();
+        if (elementType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        Preconditions.checkState(
+            !Preconditions.checkNotNull(elementType.getTypeName()).isCollectionType(),
+            "Nested arrays not supported by BigQuery.");
+        return fieldDescriptorFromBeamField(
+                Field.of(field.getName(), elementType), fieldNumber, nestedTypes)
+            .setLabel(Label.LABEL_REPEATED);
+      case LOGICAL_TYPE:
+        @Nullable LogicalType<?, ?> logicalType = field.getType().getLogicalType();
+        if (logicalType == null) {
+          throw new RuntimeException("Unexpected null logical type " + field.getType());
+        }
+        @Nullable Type type = LOGICAL_TYPES.get(logicalType.getIdentifier());
+        if (type == null) {
+          throw new RuntimeException("Unsupported logical type " + field.getType());
+        }
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(type);
+        break;
+      case MAP:
+        throw new RuntimeException("Map types not supported by BigQuery.");
+      default:
+        @Nullable Type primitiveType = PRIMITIVE_TYPES.get(field.getType().getTypeName());
+        if (primitiveType == null) {
+          throw new RuntimeException("Unsupported type " + field.getType());
+        }
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(primitiveType);
+    }
+    if (field.getType().getNullable()) {
+      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
+    } else {
+      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+    }
+    return fieldDescriptorBuilder;
+  }
+
+  @Nullable
+  private static Object messageValueFromRowValue(
+      FieldDescriptor fieldDescriptor, Field beamField, int index, Row row) {
+    @Nullable Object value = row.getValue(index);
+    if (value == null) {
+      if (fieldDescriptor.isOptional()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field " + fieldDescriptor.getName());
+      }
+    }
+    return toProtoValue(fieldDescriptor, beamField.getType(), value);
+  }
+
+  private static Object toProtoValue(
+      FieldDescriptor fieldDescriptor, FieldType beamFieldType, Object value) {
+    switch (beamFieldType.getTypeName()) {
+      case ROW:
+        return messageFromBeamRow(fieldDescriptor.getMessageType(), (Row) value);
+      case ARRAY:
+        List<Object> list = (List<Object>) value;
+        @Nullable FieldType arrayElementType = beamFieldType.getCollectionElementType();
+        if (arrayElementType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        return list.stream()
+            .map(v -> toProtoValue(fieldDescriptor, arrayElementType, v))
+            .collect(Collectors.toList());
+      case ITERABLE:
+        Iterable<Object> iterable = (Iterable<Object>) value;
+        @Nullable FieldType iterableElementType = beamFieldType.getCollectionElementType();
+        if (iterableElementType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        return StreamSupport.stream(iterable.spliterator(), false)
+            .map(v -> toProtoValue(fieldDescriptor, iterableElementType, v))
+            .collect(Collectors.toList());
+      case MAP:
+        throw new RuntimeException("Map types not supported by BigQuery.");
+      default:
+        return scalarToProtoValue(beamFieldType, value);
+    }
+  }
+
+  @VisibleForTesting
+  static Object scalarToProtoValue(FieldType beamFieldType, Object value) {
+    if (beamFieldType.getTypeName() == TypeName.LOGICAL_TYPE) {
+      @Nullable LogicalType<?, ?> logicalType = beamFieldType.getLogicalType();
+      if (logicalType == null) {
+        throw new RuntimeException("Unexpectedly null logical type " + beamFieldType);
+      }
+      @Nullable
+      BiFunction<LogicalType<?, ?>, Object, Object> logicalTypeEncoder =
+          LOGICAL_TYPE_ENCODERS.get(logicalType.getIdentifier());
+      if (logicalTypeEncoder == null) {
+        throw new RuntimeException("Unsupported logical type " + logicalType.getIdentifier());
+      }
+      return logicalTypeEncoder.apply(logicalType, value);
+    } else {
+      @Nullable
+      Function<Object, Object> encoder = PRIMITIVE_ENCODERS.get(beamFieldType.getTypeName());
+      if (encoder == null) {
+        throw new RuntimeException("Unexpected beam type " + beamFieldType);
+      }
+      return encoder.apply(value);
+    }
+  }
+
+  private static ByteString serializeBigDecimal(
+      BigDecimal v, int scale, BigDecimal maxValue, BigDecimal minValue, String typeName) {

Review comment:
       This was copied from zetasql code. The main reason I believe is that this function can be used for BIGNUMERIC types as well, though I haven't added that support in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
yirutang commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587915064



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.DescriptorValidationException;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FileDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.LogicalType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Bytes;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Beam {@link Row} objects to dynamic protocol message, for use with
+ * the Storage write API.
+ */
+public class BeamRowToStorageApiProto {
+  // Number of digits after the decimal point supported by the NUMERIC data type.
+  private static final int NUMERIC_SCALE = 9;
+  // Maximum and minimum allowed values for the NUMERIC data type.
+  private static final BigDecimal MAX_NUMERIC_VALUE =
+      new BigDecimal("99999999999999999999999999999.999999999");
+  private static final BigDecimal MIN_NUMERIC_VALUE =
+      new BigDecimal("-99999999999999999999999999999.999999999");
+
+  // TODO(reuvenlax): Support BIGNUMERIC and GEOGRAPHY types.
+  static final Map<TypeName, Type> PRIMITIVE_TYPES =
+      ImmutableMap.<TypeName, Type>builder()
+          .put(TypeName.INT16, Type.TYPE_INT32)
+          .put(TypeName.BYTE, Type.TYPE_INT32)
+          .put(TypeName.INT32, Type.TYPE_INT32)
+          .put(TypeName.INT64, Type.TYPE_INT64)
+          .put(TypeName.FLOAT, Type.TYPE_FLOAT)
+          .put(TypeName.DOUBLE, Type.TYPE_DOUBLE)
+          .put(TypeName.STRING, Type.TYPE_STRING)
+          .put(TypeName.BOOLEAN, Type.TYPE_BOOL)
+          .put(TypeName.DATETIME, Type.TYPE_INT64)
+          .put(TypeName.BYTES, Type.TYPE_BYTES)
+          .put(TypeName.DECIMAL, Type.TYPE_BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, Type> LOGICAL_TYPES =
+      ImmutableMap.<String, Type>builder()
+          .put(SqlTypes.DATE.getIdentifier(), Type.TYPE_INT32)
+          .put(SqlTypes.TIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.DATETIME.getIdentifier(), Type.TYPE_INT64)
+          .put(SqlTypes.TIMESTAMP.getIdentifier(), Type.TYPE_INT64)
+          .put(EnumerationType.IDENTIFIER, Type.TYPE_STRING)
+          .build();
+
+  static final Map<TypeName, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<TypeName, Function<Object, Object>>builder()
+          .put(TypeName.INT16, o -> Integer.valueOf((Short) o))
+          .put(TypeName.BYTE, o -> Integer.valueOf((Byte) o))
+          .put(TypeName.INT32, Functions.identity())
+          .put(TypeName.INT64, Functions.identity())
+          .put(TypeName.FLOAT, Function.identity())
+          .put(TypeName.DOUBLE, Function.identity())
+          .put(TypeName.STRING, Function.identity())
+          .put(TypeName.BOOLEAN, Function.identity())
+          .put(TypeName.DATETIME, o -> ((ReadableInstant) o).getMillis() * 1000)

Review comment:
       So it is both a logical type and a primitive type?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #14136: [BEAM-11648] Add conversion utilities for BigQuery Storage API sink

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #14136:
URL: https://github.com/apache/beam/pull/14136#discussion_r587936766



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.DescriptorProtos.DescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Label;
+import com.google.protobuf.DescriptorProtos.FieldDescriptorProto.Type;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoTest {
+  private static final TableSchema BASE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(
+              ImmutableList.<TableFieldSchema>builder()
+                  .add(new TableFieldSchema().setType("STRING").setName("stringValue"))
+                  .add(new TableFieldSchema().setType("BYTES").setName("bytesValue"))
+                  .add(new TableFieldSchema().setType("INT64").setName("int64Value"))
+                  .add(new TableFieldSchema().setType("INTEGER").setName("intValue"))
+                  .add(new TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+                  .add(new TableFieldSchema().setType("FLOAT").setName("floatValue"))
+                  .add(new TableFieldSchema().setType("BOOL").setName("boolValue"))
+                  .add(new TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+                  .add(new TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+                  .add(new TableFieldSchema().setType("TIME").setName("timeValue"))
+                  .add(new TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+                  .add(new TableFieldSchema().setType("DATE").setName("dateValue"))
+                  .add(new TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+                  .build());
+
+  private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
+      DescriptorProto.newBuilder()
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("stringvalue")
+                  .setNumber(1)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("bytesvalue")
+                  .setNumber(2)
+                  .setType(Type.TYPE_BYTES)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("int64value")
+                  .setNumber(3)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("intvalue")
+                  .setNumber(4)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("float64value")
+                  .setNumber(5)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("floatvalue")
+                  .setNumber(6)
+                  .setType(Type.TYPE_DOUBLE)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("boolvalue")
+                  .setNumber(7)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("booleanvalue")
+                  .setNumber(8)
+                  .setType(Type.TYPE_BOOL)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvalue")
+                  .setNumber(9)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timevalue")
+                  .setNumber(10)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datetimevalue")
+                  .setNumber(11)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("datevalue")
+                  .setNumber(12)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("numericvalue")
+                  .setNumber(13)
+                  .setType(Type.TYPE_STRING)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
+          .build();
+
+  private static final TableSchema NESTED_TABLE_SCHEMA =

Review comment:
       Ah, good point. Added test (and it indeed turned up a bug).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org