You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/18 23:35:06 UTC

[GitHub] [beam] prodriguezdefino opened a new pull request, #24274: initial impl

prodriguezdefino opened a new pull request, #24274:
URL: https://github.com/apache/beam/pull/24274

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1036452165


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -265,6 +266,7 @@ public abstract static class Builder {
           .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
           .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
           .put("Enum", StandardSQLTypeName.STRING)
+          .put(MicrosInstant.IDENTIFIER, StandardSQLTypeName.TIMESTAMP)

Review Comment:
   Is there a test verifying that this logical type is supported? MicrosInstant is used for `java.time.Instant` (saying, testing write `java.time.Instant` to BigQuery)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1324790335

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1332807159

   Please resolve against current codebase


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1361239612

   Reminder, please take a look at this pr: @kileys @Abacn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1411367417

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1037462086


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -265,6 +266,7 @@ public abstract static class Builder {
           .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
           .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
           .put("Enum", StandardSQLTypeName.STRING)
+          .put(MicrosInstant.IDENTIFIER, StandardSQLTypeName.TIMESTAMP)

Review Comment:
   `MicrosInstant` is also used for the translation of the Avro logical type named TimestampMicros (which is being forced in the test `BigQueryWriteIOTest.testWriteAvro()`. 
   For the time related logical types (timestampmicros and millis, alongside with Date), Avro types store internally a numeric value that represents time since epoch (micros, millis or days depending on the type). 
   I'm not sure if I can force an Avro object to carry a `java.time.Instant` (maybe not) and this code path executes only when the underlying Avro `GenericRecord` has a time related logical type (which is being tested) and its schema gets translated to a Beam `Row.Schema` yielding the MicrosInstant (AvroUtils.toFieldType method). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1411292806

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1040387642


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -540,7 +540,15 @@ public class BigQueryIO {
    * A formatting function that maps a TableRow to itself. This allows sending a {@code
    * PCollection<TableRow>} directly to BigQueryIO.Write.
    */
-  static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = input -> input;
+  static final SerializableFunction<TableRow, TableRow> TABLE_ROW_IDENTITY_FORMATTER =
+      input -> input;
+
+  /**
+   * A formatting function that maps a GenericRecord to itself. This allows sending a {@code
+   * PCollection<GenericRecord>} directly to BigQueryIO.Write.
+   */
+  static final SerializableFunction<AvroWriteRequest<GenericRecord>, GenericRecord>
+      GENERIC_RECORD_IDENTITY_FORMATTER = input -> input.getElement();

Review Comment:
   Done.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -540,7 +540,15 @@ public class BigQueryIO {
    * A formatting function that maps a TableRow to itself. This allows sending a {@code
    * PCollection<TableRow>} directly to BigQueryIO.Write.
    */
-  static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = input -> input;
+  static final SerializableFunction<TableRow, TableRow> TABLE_ROW_IDENTITY_FORMATTER =
+      input -> input;

Review Comment:
   Done.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1925,7 +1933,19 @@ public static <T> Write<T> write() {
    * Write#withFormatFunction(SerializableFunction)}.
    */
   public static Write<TableRow> writeTableRows() {
-    return BigQueryIO.<TableRow>write().withFormatFunction(IDENTITY_FORMATTER);
+    return BigQueryIO.<TableRow>write().withFormatFunction(TABLE_ROW_IDENTITY_FORMATTER);
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} containing {@link GenericRecord
+   * GenericRecords} to a BigQuery table.
+   *
+   * <p>It is recommended to instead use {@link #write} with {@link
+   * Write#withFormatFunction(SerializableFunction)}.

Review Comment:
   Removed, it was copied from the `writeTableRows` method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1108874740


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -150,10 +150,14 @@ public class AvroUtils {
     GenericData.get().addLogicalTypeConversion(new JodaTimestampConversion());
   }
 
-  // Unwrap an AVRO schema into the base type an whether it is nullable.
-  static class TypeWithNullability {
-    public final org.apache.avro.Schema type;
-    public final boolean nullable;
+  /** Unwrap an AVRO schema into the base type an whether it is nullable. */

Review Comment:
   I added the changes in the avro extension module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1105064269


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -150,10 +150,14 @@ public class AvroUtils {
     GenericData.get().addLogicalTypeConversion(new JodaTimestampConversion());
   }
 
-  // Unwrap an AVRO schema into the base type an whether it is nullable.
-  static class TypeWithNullability {
-    public final org.apache.avro.Schema type;
-    public final boolean nullable;
+  /** Unwrap an AVRO schema into the base type an whether it is nullable. */

Review Comment:
   then at lease we want to sync the content of AvroUtils in sdks/extension and here in case the migration happens the the change gets dropped



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -150,10 +150,14 @@ public class AvroUtils {
     GenericData.get().addLogicalTypeConversion(new JodaTimestampConversion());
   }
 
-  // Unwrap an AVRO schema into the base type an whether it is nullable.
-  static class TypeWithNullability {
-    public final org.apache.avro.Schema type;
-    public final boolean nullable;
+  /** Unwrap an AVRO schema into the base type an whether it is nullable. */

Review Comment:
   then at least we want to sync the content of AvroUtils in sdks/extension and here in case the migration happens the the change gets dropped



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1338682587

   # [Codecov](https://codecov.io/gh/apache/beam/pull/24274?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#24274](https://codecov.io/gh/apache/beam/pull/24274?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (35fb28a) into [master](https://codecov.io/gh/apache/beam/commit/9915ec466b217dc79ae47d0d318f73ad174da626?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (9915ec4) will **increase** coverage by `9.62%`.
   > The diff coverage is `68.75%`.
   
   > :exclamation: Current head 35fb28a differs from pull request most recent head e3ff8cb. Consider uploading reports for the commit e3ff8cb to get more accurate results
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #24274      +/-   ##
   ==========================================
   + Coverage   73.37%   82.99%   +9.62%     
   ==========================================
     Files         718      476     -242     
     Lines       97165    67399   -29766     
   ==========================================
   - Hits        71290    55938   -15352     
   + Misses      24528    11461   -13067     
   + Partials     1347        0    -1347     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `?` | |
   | python | `82.99% <68.75%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/24274?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.54% <ø> (-0.13%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `92.90% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/azure/blobstorageio.py](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vYXp1cmUvYmxvYnN0b3JhZ2Vpby5weQ==) | `27.14% <17.64%> (+0.13%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `92.28% <73.68%> (+0.15%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/aws/s3io.py](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vYXdzL3MzaW8ucHk=) | `84.84% <77.77%> (+0.41%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/fileio.py](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZWlvLnB5) | `96.10% <84.61%> (+0.04%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `95.33% <100.00%> (-0.13%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/aws/s3filesystem.py](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vYXdzL3MzZmlsZXN5c3RlbS5weQ==) | `78.84% <100.00%> (ø)` | |
   | [...thon/apache\_beam/io/azure/blobstoragefilesystem.py](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vYXp1cmUvYmxvYnN0b3JhZ2VmaWxlc3lzdGVtLnB5) | `79.24% <100.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/filebasedsink.py](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZWJhc2Vkc2luay5weQ==) | `95.89% <100.00%> (-0.02%)` | :arrow_down: |
   | ... and [253 more](https://codecov.io/gh/apache/beam/pull/24274/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1322466493

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1430085502

   Hi @prodriguezdefino thanks for the patience. This looks good to me. Would you mind addressing  https://github.com/apache/beam/pull/24274#discussion_r1105064269 so that when #24992 also gets in it won't break? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1040387046


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message,
+ * for use with the Storage write API.
+ */
+public class AvroGenericRecordToStorageApiProto {
+
+  static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
+      ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
+          .put(Schema.Type.INT, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FIXED, TableFieldSchema.Type.BYTES)
+          .put(Schema.Type.LONG, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FLOAT, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.DOUBLE, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.STRING, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BOOLEAN, TableFieldSchema.Type.BOOL)
+          .put(Schema.Type.ENUM, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BYTES, TableFieldSchema.Type.BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
+      ImmutableMap.<String, TableFieldSchema.Type>builder()
+          .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
+          .put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC)
+          .put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
+          .build();
+
+  static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
+          .put(Schema.Type.INT, o -> Long.valueOf((int) o))
+          .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes()))
+          .put(Schema.Type.LONG, Functions.identity())
+          .put(
+              Schema.Type.FLOAT,
+              o -> Double.valueOf(Float.valueOf((float) o).toString()).doubleValue())
+          .put(Schema.Type.DOUBLE, Function.identity())
+          .put(Schema.Type.STRING, Function.identity())
+          .put(Schema.Type.BOOLEAN, Function.identity())
+          .put(Schema.Type.ENUM, o -> o.toString())
+          .put(Schema.Type.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType, Object, Object>> LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
+          .put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value))
+          .put(
+              LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal)
+          .put(
+              LogicalTypes.timestampMicros().getName(),
+              (logicalType, value) -> convertTimestamp(value, true))
+          .put(
+              LogicalTypes.timestampMillis().getName(),
+              (logicalType, value) -> convertTimestamp(value, false))
+          .put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value))
+          .build();
+
+  static String convertUUID(Object value) {
+    if (value instanceof UUID) {
+      return ((UUID) value).toString();
+    } else {
+      Preconditions.checkArgument(value instanceof String, "Expecting a value as String type.");
+      UUID.fromString((String) value);
+      return (String) value;
+    }
+  }
+
+  static Long convertTimestamp(Object value, boolean micros) {
+    if (value instanceof ReadableInstant) {
+      return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Long, "Expecting a value as Long type (millis).");
+      return (Long) value;
+    }
+  }
+
+  static Integer convertDate(Object value) {
+    if (value instanceof ReadableInstant) {
+      return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Integer, "Expecting a value as Integer type (days).");
+      return (Integer) value;
+    }
+  }
+
+  static ByteString convertDecimal(LogicalType logicalType, Object value) {
+    ByteBuffer byteBuffer = (ByteBuffer) value;
+    BigDecimal bigDecimal =
+        new Conversions.DecimalConversion()
+            .fromBytes(
+                byteBuffer.duplicate(),
+                Schema.create(Schema.Type.NULL), // dummy schema, not used
+                logicalType);
+    return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
+  }
+
+  /**
+   * Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data
+   * through BigQuery Storage API.
+   *
+   * @param schema An Avro Schema
+   * @return Returns the TableSchema created from the provided Schema
+   */
+  public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) {
+    Preconditions.checkState(!schema.getFields().isEmpty());
+
+    TableSchema.Builder builder = TableSchema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      builder.addFields(fieldDescriptorFromAvroField(field));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Given an Avro {@link GenericRecord} object, returns a protocol-buffer message that can be used
+   * to write data using the BigQuery Storage streaming API.
+   *
+   * @param descriptor The Descriptor for the DynamicMessage result
+   * @param record An Avro GenericRecord
+   * @return A dynamic message representation of a Proto payload to be used for StorageWrite API
+   */
+  public static DynamicMessage messageFromGenericRecord(
+      Descriptor descriptor, GenericRecord record) {
+    Schema schema = record.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (Schema.Field field : schema.getFields()) {
+      FieldDescriptor fieldDescriptor =
+          Preconditions.checkNotNull(descriptor.findFieldByName(field.name().toLowerCase()));
+      @Nullable
+      Object value =
+          messageValueFromGenericRecordValue(fieldDescriptor, field, field.name(), record);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) {
+    @Nullable Schema schema = field.schema();
+    TableFieldSchema.Builder builder =
+        TableFieldSchema.newBuilder().setName(field.name().toLowerCase());
+    Schema elementType = null;
+    switch (schema.getType()) {
+      case RECORD:
+        if (schema == null) {

Review Comment:
   Good catch, moved to before the switch statement.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message,
+ * for use with the Storage write API.
+ */
+public class AvroGenericRecordToStorageApiProto {
+
+  static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
+      ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
+          .put(Schema.Type.INT, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FIXED, TableFieldSchema.Type.BYTES)
+          .put(Schema.Type.LONG, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FLOAT, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.DOUBLE, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.STRING, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BOOLEAN, TableFieldSchema.Type.BOOL)
+          .put(Schema.Type.ENUM, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BYTES, TableFieldSchema.Type.BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
+      ImmutableMap.<String, TableFieldSchema.Type>builder()
+          .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
+          .put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC)
+          .put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
+          .build();
+
+  static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
+          .put(Schema.Type.INT, o -> Long.valueOf((int) o))
+          .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes()))
+          .put(Schema.Type.LONG, Functions.identity())
+          .put(
+              Schema.Type.FLOAT,
+              o -> Double.valueOf(Float.valueOf((float) o).toString()).doubleValue())
+          .put(Schema.Type.DOUBLE, Function.identity())
+          .put(Schema.Type.STRING, Function.identity())
+          .put(Schema.Type.BOOLEAN, Function.identity())
+          .put(Schema.Type.ENUM, o -> o.toString())
+          .put(Schema.Type.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType, Object, Object>> LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
+          .put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value))
+          .put(
+              LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal)
+          .put(
+              LogicalTypes.timestampMicros().getName(),
+              (logicalType, value) -> convertTimestamp(value, true))
+          .put(
+              LogicalTypes.timestampMillis().getName(),
+              (logicalType, value) -> convertTimestamp(value, false))
+          .put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value))
+          .build();
+
+  static String convertUUID(Object value) {
+    if (value instanceof UUID) {
+      return ((UUID) value).toString();
+    } else {
+      Preconditions.checkArgument(value instanceof String, "Expecting a value as String type.");
+      UUID.fromString((String) value);
+      return (String) value;
+    }
+  }
+
+  static Long convertTimestamp(Object value, boolean micros) {
+    if (value instanceof ReadableInstant) {
+      return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Long, "Expecting a value as Long type (millis).");
+      return (Long) value;
+    }
+  }
+
+  static Integer convertDate(Object value) {
+    if (value instanceof ReadableInstant) {
+      return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Integer, "Expecting a value as Integer type (days).");
+      return (Integer) value;
+    }
+  }
+
+  static ByteString convertDecimal(LogicalType logicalType, Object value) {
+    ByteBuffer byteBuffer = (ByteBuffer) value;
+    BigDecimal bigDecimal =
+        new Conversions.DecimalConversion()
+            .fromBytes(
+                byteBuffer.duplicate(),
+                Schema.create(Schema.Type.NULL), // dummy schema, not used
+                logicalType);
+    return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
+  }
+
+  /**
+   * Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data
+   * through BigQuery Storage API.
+   *
+   * @param schema An Avro Schema
+   * @return Returns the TableSchema created from the provided Schema
+   */
+  public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) {
+    Preconditions.checkState(!schema.getFields().isEmpty());
+
+    TableSchema.Builder builder = TableSchema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      builder.addFields(fieldDescriptorFromAvroField(field));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Given an Avro {@link GenericRecord} object, returns a protocol-buffer message that can be used
+   * to write data using the BigQuery Storage streaming API.
+   *
+   * @param descriptor The Descriptor for the DynamicMessage result
+   * @param record An Avro GenericRecord
+   * @return A dynamic message representation of a Proto payload to be used for StorageWrite API
+   */
+  public static DynamicMessage messageFromGenericRecord(
+      Descriptor descriptor, GenericRecord record) {
+    Schema schema = record.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (Schema.Field field : schema.getFields()) {
+      FieldDescriptor fieldDescriptor =
+          Preconditions.checkNotNull(descriptor.findFieldByName(field.name().toLowerCase()));
+      @Nullable
+      Object value =
+          messageValueFromGenericRecordValue(fieldDescriptor, field, field.name(), record);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) {
+    @Nullable Schema schema = field.schema();
+    TableFieldSchema.Builder builder =
+        TableFieldSchema.newBuilder().setName(field.name().toLowerCase());
+    Schema elementType = null;
+    switch (schema.getType()) {
+      case RECORD:
+        if (schema == null) {
+          throw new RuntimeException("Unexpected null schema!");
+        }
+        Preconditions.checkState(!schema.getFields().isEmpty());

Review Comment:
   They are allowed, the check was also in the `BeamRowToStorageApiProto` similar methods (Beam Row.Schema to Descriptors) and wanted to put the same safeguards. 
   
   Not sure how useful would be to have an empty record in this context, but let me know if you want me to remove the restriction. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1348408108

   Reminder, please take a look at this pr: @kileys @Abacn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1386957925

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kileys for label java.
   R: @johnjcasey for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1092440694


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -150,10 +150,14 @@ public class AvroUtils {
     GenericData.get().addLogicalTypeConversion(new JodaTimestampConversion());
   }
 
-  // Unwrap an AVRO schema into the base type an whether it is nullable.
-  static class TypeWithNullability {
-    public final org.apache.avro.Schema type;
-    public final boolean nullable;
+  /** Unwrap an AVRO schema into the base type an whether it is nullable. */

Review Comment:
   Not sure if I should migrate this code to use the new location since the other references from the bq package are still using this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1036495540


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3165,6 +3156,26 @@ private <DestinationT> WriteResult continueExpandTyped(
           storageApiDynamicDestinations =
               new StorageApiDynamicDestinationsBeamRow<>(
                   dynamicDestinations, elementSchema, elementToRowFunction);
+        } else if (getAvroRowWriterFactory() != null) {

Review Comment:
   Please add a writeGenericRecords() method so users don't have to deal with the avro format function.
   
   Also - AFAICT from inspection, the AvroWriterFactory is a bit broken if side inputs are used for schemas. Do we have a test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1036658036


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -151,9 +151,13 @@ public class AvroUtils {
   }
 
   // Unwrap an AVRO schema into the base type an whether it is nullable.

Review Comment:
   thanks, changed this comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1092439573


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3165,6 +3156,26 @@ private <DestinationT> WriteResult continueExpandTyped(
           storageApiDynamicDestinations =
               new StorageApiDynamicDestinationsBeamRow<>(
                   dynamicDestinations, elementSchema, elementToRowFunction);
+        } else if (getAvroRowWriterFactory() != null) {

Review Comment:
   Also and JIC, besides the added unit test, checked this PR's code with a simple [DynamicDestination definition](https://github.com/prodriguezdefino/apache-beam-streaming-tests/blob/map-support-bq-storagewrites/canonical-streaming-pipelines/src/main/java/com/google/cloud/pso/beam/pipelines/transforms/WriteFormatToBigQuery.java#L70) and seems to work fine for job id: `2023-01-31_12_10_08-12389116961981372457`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1322400784

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1321027980

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1037462086


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -265,6 +266,7 @@ public abstract static class Builder {
           .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
           .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
           .put("Enum", StandardSQLTypeName.STRING)
+          .put(MicrosInstant.IDENTIFIER, StandardSQLTypeName.TIMESTAMP)

Review Comment:
   `MicrosInstant` is also used for the translation of the Avro logical type named TimestampMicros (which is being forced in the test `BigQueryWriteIOTest.testWriteAvro()`. 
   For the time related logical types (timestampmicros and millis, alongside with Date), Avro types store internally a numeric value that represents time since epoch (micros, millis or days depending on the type). 
   I'm not sure if I can force an Avro object to carry a `java.time.Instant` (maybe not) and this code path executes only when the underlying Avro `GenericRecord` has a time related logical type (which is being tested) and its schema gets translated to a Beam` Row.Schema` yielding the MicrosInstant (AvroUtils.toFieldType method). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1106109569


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -150,10 +150,14 @@ public class AvroUtils {
     GenericData.get().addLogicalTypeConversion(new JodaTimestampConversion());
   }
 
-  // Unwrap an AVRO schema into the base type an whether it is nullable.
-  static class TypeWithNullability {
-    public final org.apache.avro.Schema type;
-    public final boolean nullable;
+  /** Unwrap an AVRO schema into the base type an whether it is nullable. */

Review Comment:
   FYI the open PR for moving to use sdks/extenstion avro module is #24992



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1381772399

   Reminder, please take a look at this pr: @robertwb @pabloem 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1332056886

   Reminder, please take a look at this pr: @kileys @Abacn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1332413748

   Thanks for the contribution! Taking a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1040387583


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3165,6 +3156,26 @@ private <DestinationT> WriteResult continueExpandTyped(
           storageApiDynamicDestinations =
               new StorageApiDynamicDestinationsBeamRow<>(
                   dynamicDestinations, elementSchema, elementToRowFunction);
+        } else if (getAvroRowWriterFactory() != null) {

Review Comment:
   Added a test to check on this for StorageWrites API



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message,
+ * for use with the Storage write API.
+ */
+public class AvroGenericRecordToStorageApiProto {
+
+  static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
+      ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
+          .put(Schema.Type.INT, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FIXED, TableFieldSchema.Type.BYTES)
+          .put(Schema.Type.LONG, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FLOAT, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.DOUBLE, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.STRING, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BOOLEAN, TableFieldSchema.Type.BOOL)
+          .put(Schema.Type.ENUM, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BYTES, TableFieldSchema.Type.BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
+      ImmutableMap.<String, TableFieldSchema.Type>builder()
+          .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
+          .put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC)
+          .put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
+          .build();
+
+  static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
+          .put(Schema.Type.INT, o -> Long.valueOf((int) o))
+          .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes()))
+          .put(Schema.Type.LONG, Functions.identity())
+          .put(
+              Schema.Type.FLOAT,
+              o -> Double.valueOf(Float.valueOf((float) o).toString()).doubleValue())
+          .put(Schema.Type.DOUBLE, Function.identity())
+          .put(Schema.Type.STRING, Function.identity())
+          .put(Schema.Type.BOOLEAN, Function.identity())
+          .put(Schema.Type.ENUM, o -> o.toString())
+          .put(Schema.Type.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType, Object, Object>> LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
+          .put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value))
+          .put(
+              LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal)
+          .put(
+              LogicalTypes.timestampMicros().getName(),
+              (logicalType, value) -> convertTimestamp(value, true))
+          .put(
+              LogicalTypes.timestampMillis().getName(),
+              (logicalType, value) -> convertTimestamp(value, false))
+          .put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value))
+          .build();
+
+  static String convertUUID(Object value) {
+    if (value instanceof UUID) {
+      return ((UUID) value).toString();
+    } else {
+      Preconditions.checkArgument(value instanceof String, "Expecting a value as String type.");
+      UUID.fromString((String) value);
+      return (String) value;
+    }
+  }
+
+  static Long convertTimestamp(Object value, boolean micros) {
+    if (value instanceof ReadableInstant) {
+      return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Long, "Expecting a value as Long type (millis).");
+      return (Long) value;
+    }
+  }
+
+  static Integer convertDate(Object value) {
+    if (value instanceof ReadableInstant) {
+      return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Integer, "Expecting a value as Integer type (days).");
+      return (Integer) value;
+    }
+  }
+
+  static ByteString convertDecimal(LogicalType logicalType, Object value) {
+    ByteBuffer byteBuffer = (ByteBuffer) value;
+    BigDecimal bigDecimal =
+        new Conversions.DecimalConversion()
+            .fromBytes(
+                byteBuffer.duplicate(),
+                Schema.create(Schema.Type.NULL), // dummy schema, not used
+                logicalType);
+    return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
+  }
+
+  /**
+   * Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data
+   * through BigQuery Storage API.
+   *
+   * @param schema An Avro Schema
+   * @return Returns the TableSchema created from the provided Schema
+   */
+  public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) {
+    Preconditions.checkState(!schema.getFields().isEmpty());
+
+    TableSchema.Builder builder = TableSchema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      builder.addFields(fieldDescriptorFromAvroField(field));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Given an Avro {@link GenericRecord} object, returns a protocol-buffer message that can be used
+   * to write data using the BigQuery Storage streaming API.
+   *
+   * @param descriptor The Descriptor for the DynamicMessage result
+   * @param record An Avro GenericRecord
+   * @return A dynamic message representation of a Proto payload to be used for StorageWrite API
+   */
+  public static DynamicMessage messageFromGenericRecord(
+      Descriptor descriptor, GenericRecord record) {
+    Schema schema = record.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (Schema.Field field : schema.getFields()) {
+      FieldDescriptor fieldDescriptor =
+          Preconditions.checkNotNull(descriptor.findFieldByName(field.name().toLowerCase()));
+      @Nullable
+      Object value =
+          messageValueFromGenericRecordValue(fieldDescriptor, field, field.name(), record);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) {
+    @Nullable Schema schema = field.schema();
+    TableFieldSchema.Builder builder =
+        TableFieldSchema.newBuilder().setName(field.name().toLowerCase());
+    Schema elementType = null;
+    switch (schema.getType()) {
+      case RECORD:
+        if (schema == null) {
+          throw new RuntimeException("Unexpected null schema!");
+        }
+        Preconditions.checkState(!schema.getFields().isEmpty());
+        builder = builder.setType(TableFieldSchema.Type.STRUCT);
+        for (Schema.Field recordField : schema.getFields()) {
+          builder = builder.addFields(fieldDescriptorFromAvroField(recordField));
+        }
+        break;
+      case ARRAY:
+        elementType = TypeWithNullability.create(schema.getElementType()).getType();
+        if (elementType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        Preconditions.checkState(
+            elementType.getType() != Schema.Type.ARRAY, "Nested arrays not supported by BigQuery.");
+
+        TableFieldSchema elementFieldSchema =
+            fieldDescriptorFromAvroField(
+                new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()));
+        builder = builder.setType(elementFieldSchema.getType());
+        builder.addAllFields(elementFieldSchema.getFieldsList());
+        builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
+        break;
+      case MAP:
+        Schema keyType = Schema.create(Schema.Type.STRING);
+        Schema valueType = TypeWithNullability.create(schema.getElementType()).getType();
+        if (valueType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        TableFieldSchema keyFieldSchema =
+            fieldDescriptorFromAvroField(
+                new Schema.Field("key", keyType, "key of the map entry", Schema.Field.NULL_VALUE));
+        TableFieldSchema valueFieldSchema =
+            fieldDescriptorFromAvroField(
+                new Schema.Field(
+                    "value", valueType, "value of the map entry", Schema.Field.NULL_VALUE));
+        builder =
+            builder
+                .setType(TableFieldSchema.Type.STRUCT)
+                .addFields(keyFieldSchema)
+                .addFields(valueFieldSchema)
+                .setMode(TableFieldSchema.Mode.REPEATED);
+        break;
+      case UNION:
+        elementType = TypeWithNullability.create(schema).getType();
+        if (elementType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        // check to see if more than one non-null type is defined in the union
+        Preconditions.checkState(
+            elementType.getType() != Schema.Type.UNION,
+            "Multiple non-null union types are not supported.");
+        TableFieldSchema unionFieldSchema =
+            fieldDescriptorFromAvroField(
+                new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()));
+        builder =
+            builder
+                .setType(unionFieldSchema.getType())
+                .addAllFields(unionFieldSchema.getFieldsList());
+        break;
+      default:
+        elementType = TypeWithNullability.create(schema).getType();
+        @Nullable
+        TableFieldSchema.Type primitiveType =
+            Optional.ofNullable(LogicalTypes.fromSchema(elementType))
+                .map(logicalType -> LOGICAL_TYPES.get(logicalType.getName()))
+                .orElse(PRIMITIVE_TYPES.get(elementType.getType()));
+        if (primitiveType == null) {
+          throw new RuntimeException("Unsupported type " + elementType.getType());
+        }
+        // a scalar will be required by default, if defined as part of union then
+        // caller will set nullability requirements
+        builder = builder.setType(primitiveType);
+    }
+    if (builder.getMode() != TableFieldSchema.Mode.REPEATED) {
+      if (TypeWithNullability.create(schema).isNullable()) {
+        builder = builder.setMode(TableFieldSchema.Mode.NULLABLE);
+      } else {
+        builder = builder.setMode(TableFieldSchema.Mode.REQUIRED);
+      }
+    }
+    if (field.doc() != null) {
+      builder = builder.setDescription(field.doc());
+    }
+    return builder.build();
+  }
+
+  @Nullable
+  private static Object messageValueFromGenericRecordValue(
+      FieldDescriptor fieldDescriptor, Schema.Field avroField, String name, GenericRecord record) {
+    @Nullable Object value = record.get(name);
+    if (value == null) {
+      if (fieldDescriptor.isOptional()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field " + fieldDescriptor.getName());
+      }
+    }
+    return toProtoValue(fieldDescriptor, avroField.schema(), value);
+  }
+
+  private static Object toProtoValue(
+      FieldDescriptor fieldDescriptor, Schema avroSchema, Object value) {
+    switch (avroSchema.getType()) {
+      case RECORD:
+        return messageFromGenericRecord(fieldDescriptor.getMessageType(), (GenericRecord) value);
+      case ARRAY:
+        List<Object> list = (List<Object>) value;

Review Comment:
   Changed it to Iterable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1069516293


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -150,10 +150,14 @@ public class AvroUtils {
     GenericData.get().addLogicalTypeConversion(new JodaTimestampConversion());
   }
 
-  // Unwrap an AVRO schema into the base type an whether it is nullable.
-  static class TypeWithNullability {
-    public final org.apache.avro.Schema type;
-    public final boolean nullable;
+  /** Unwrap an AVRO schema into the base type an whether it is nullable. */

Review Comment:
   AvroUtil has been migrated to sdks/extension



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1427017664

   Reminder, please take a look at this pr: @kennknowles @pabloem 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
Abacn commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1348551236

   And @reuvenlax who is actually reviewing this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1038351730


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -265,6 +266,7 @@ public abstract static class Builder {
           .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
           .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
           .put("Enum", StandardSQLTypeName.STRING)
+          .put(MicrosInstant.IDENTIFIER, StandardSQLTypeName.TIMESTAMP)

Review Comment:
   yeah sounds goos to me



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -265,6 +266,7 @@ public abstract static class Builder {
           .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
           .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
           .put("Enum", StandardSQLTypeName.STRING)
+          .put(MicrosInstant.IDENTIFIER, StandardSQLTypeName.TIMESTAMP)

Review Comment:
   yeah sounds good to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1322518366

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kileys for label java.
   R: @Abacn for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1334152091

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1038186826


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -265,6 +266,7 @@ public abstract static class Builder {
           .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
           .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
           .put("Enum", StandardSQLTypeName.STRING)
+          .put(MicrosInstant.IDENTIFIER, StandardSQLTypeName.TIMESTAMP)

Review Comment:
   fyi Support of Micros_instant in AvroUtils is about to rolled back due to a breaking change (#24489). By now MicrosInstant logical type (and `java.time.Instant`) support is incomplete throughout the code base. Sorry for inconvenience.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1411367208

   Tested this change with 2 very similar pipelines which:
    * read from PS 
    * transform format into AVRO 
    * and then one of them directly go into BQ and the other one transform to Row using beam schema (current possible path with code in `master`, the other one would be to write TableRow), 
    * both pipelines process ~250MB/s  
   and as expected the difference in resource utilization is significant. 
   
   Using beam rows as BigQueryIO input format: 
   <img width="483" alt="Screenshot 2023-01-31 at 6 22 08 PM" src="https://user-images.githubusercontent.com/3438103/215930472-152134d4-130a-4307-bcdc-469d1d0ee482.png">
   
   Using write GenericRecords as BigQueryIO input format: 
   <img width="460" alt="Screenshot 2023-01-31 at 6 22 24 PM" src="https://user-images.githubusercontent.com/3438103/215930566-af48f252-52c7-487c-94d5-5126ec4d7b0e.png">
   
   The difference in vCPU utilization at similar runtime is 176 vCPU/hr vs 117 vCPU/hr, more than 40% improvement when using GenericRecords. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1092440694


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -150,10 +150,14 @@ public class AvroUtils {
     GenericData.get().addLogicalTypeConversion(new JodaTimestampConversion());
   }
 
-  // Unwrap an AVRO schema into the base type an whether it is nullable.
-  static class TypeWithNullability {
-    public final org.apache.avro.Schema type;
-    public final boolean nullable;
+  /** Unwrap an AVRO schema into the base type an whether it is nullable. */

Review Comment:
   Not sure if I should migrate this to use the new location since the other references from the bq package are still there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1433894632

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1322427598

   pull_request


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1324797230

   Fixes #24329 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1334104545

   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
Abacn commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1036449733


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java:
##########
@@ -151,9 +151,13 @@ public class AvroUtils {
   }
 
   // Unwrap an AVRO schema into the base type an whether it is nullable.

Review Comment:
   Consider replace this inline comment into a brief javadoc for public class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1037464625


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Message;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Storage API DynamicDestinations used when the input is a Beam Row. */
+class StorageApiDynamicDestinationsGenericRecord<T, DestinationT extends @NonNull Object>
+    extends StorageApiDynamicDestinations<T, DestinationT> {
+
+  private final SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord;
+  private final SerializableFunction<@Nullable TableSchema, Schema> schemaFactory;
+
+  StorageApiDynamicDestinationsGenericRecord(
+      DynamicDestinations<T, DestinationT> inner,
+      SerializableFunction<@Nullable TableSchema, Schema> schemaFactory,
+      SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord) {
+    super(inner);
+    this.toGenericRecord = toGenericRecord;
+    this.schemaFactory = schemaFactory;
+  }
+
+  @Override
+  public MessageConverter<T> getMessageConverter(
+      DestinationT destination, DatasetService datasetService) throws Exception {
+    return new MessageConverter<T>() {

Review Comment:
   Yeah, pushed too quick yesterday night. It compiles now and addressed some of the comments here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1037463827


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3165,6 +3156,26 @@ private <DestinationT> WriteResult continueExpandTyped(
           storageApiDynamicDestinations =
               new StorageApiDynamicDestinationsBeamRow<>(
                   dynamicDestinations, elementSchema, elementToRowFunction);
+        } else if (getAvroRowWriterFactory() != null) {

Review Comment:
   > Please add a writeGenericRecords() method so users don't have to deal with the avro format function.
   Done.
   
   >AFAICT from inspection, the AvroWriterFactory is a bit broken if side inputs are used for schemas. Do we have a test for this?
   You mean when the TableSchema is provided as a PCollectionView? If so I can add a test for that.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3165,6 +3156,26 @@ private <DestinationT> WriteResult continueExpandTyped(
           storageApiDynamicDestinations =
               new StorageApiDynamicDestinationsBeamRow<>(
                   dynamicDestinations, elementSchema, elementToRowFunction);
+        } else if (getAvroRowWriterFactory() != null) {

Review Comment:
   > Please add a writeGenericRecords() method so users don't have to deal with the avro format function.
   
   Done.
   
   >AFAICT from inspection, the AvroWriterFactory is a bit broken if side inputs are used for schemas. Do we have a test for this?
   
   You mean when the TableSchema is provided as a PCollectionView? If so I can add a test for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1365130035

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @lukecwik for label java.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1373554429

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb for label java.
   R: @pabloem for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "prodriguezdefino (via GitHub)" <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1433894387

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1369701156

   Reminder, please take a look at this pr: @lukecwik @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1433965583

   Test passed. Some GitHub action job hanging due to self-host runner issues. merging for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] Abacn merged pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by "Abacn (via GitHub)" <gi...@apache.org>.
Abacn merged PR #24274:
URL: https://github.com/apache/beam/pull/24274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1038347518


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -265,6 +266,7 @@ public abstract static class Builder {
           .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
           .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
           .put("Enum", StandardSQLTypeName.STRING)
+          .put(MicrosInstant.IDENTIFIER, StandardSQLTypeName.TIMESTAMP)

Review Comment:
   oh, so the only supported time related logical types will be millis and date for now, is that right? 
   I can remove the timestamp in micros for the time being if that works for this PR. sounds good? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1322624290

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1038549617


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message,
+ * for use with the Storage write API.
+ */
+public class AvroGenericRecordToStorageApiProto {
+
+  static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
+      ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
+          .put(Schema.Type.INT, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FIXED, TableFieldSchema.Type.BYTES)
+          .put(Schema.Type.LONG, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FLOAT, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.DOUBLE, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.STRING, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BOOLEAN, TableFieldSchema.Type.BOOL)
+          .put(Schema.Type.ENUM, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BYTES, TableFieldSchema.Type.BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
+      ImmutableMap.<String, TableFieldSchema.Type>builder()
+          .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
+          .put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC)
+          .put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
+          .build();
+
+  static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
+          .put(Schema.Type.INT, o -> Long.valueOf((int) o))
+          .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes()))
+          .put(Schema.Type.LONG, Functions.identity())
+          .put(
+              Schema.Type.FLOAT,
+              o -> Double.valueOf(Float.valueOf((float) o).toString()).doubleValue())
+          .put(Schema.Type.DOUBLE, Function.identity())
+          .put(Schema.Type.STRING, Function.identity())
+          .put(Schema.Type.BOOLEAN, Function.identity())
+          .put(Schema.Type.ENUM, o -> o.toString())
+          .put(Schema.Type.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType, Object, Object>> LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
+          .put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value))
+          .put(
+              LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal)
+          .put(
+              LogicalTypes.timestampMicros().getName(),
+              (logicalType, value) -> convertTimestamp(value, true))
+          .put(
+              LogicalTypes.timestampMillis().getName(),
+              (logicalType, value) -> convertTimestamp(value, false))
+          .put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value))
+          .build();
+
+  static String convertUUID(Object value) {
+    if (value instanceof UUID) {
+      return ((UUID) value).toString();
+    } else {
+      Preconditions.checkArgument(value instanceof String, "Expecting a value as String type.");
+      UUID.fromString((String) value);
+      return (String) value;
+    }
+  }
+
+  static Long convertTimestamp(Object value, boolean micros) {
+    if (value instanceof ReadableInstant) {
+      return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Long, "Expecting a value as Long type (millis).");
+      return (Long) value;
+    }
+  }
+
+  static Integer convertDate(Object value) {
+    if (value instanceof ReadableInstant) {
+      return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Integer, "Expecting a value as Integer type (days).");
+      return (Integer) value;
+    }
+  }
+
+  static ByteString convertDecimal(LogicalType logicalType, Object value) {
+    ByteBuffer byteBuffer = (ByteBuffer) value;
+    BigDecimal bigDecimal =
+        new Conversions.DecimalConversion()
+            .fromBytes(
+                byteBuffer.duplicate(),
+                Schema.create(Schema.Type.NULL), // dummy schema, not used
+                logicalType);
+    return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
+  }
+
+  /**
+   * Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data
+   * through BigQuery Storage API.
+   *
+   * @param schema An Avro Schema
+   * @return Returns the TableSchema created from the provided Schema
+   */
+  public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) {
+    Preconditions.checkState(!schema.getFields().isEmpty());
+
+    TableSchema.Builder builder = TableSchema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      builder.addFields(fieldDescriptorFromAvroField(field));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Given an Avro {@link GenericRecord} object, returns a protocol-buffer message that can be used
+   * to write data using the BigQuery Storage streaming API.
+   *
+   * @param descriptor The Descriptor for the DynamicMessage result
+   * @param record An Avro GenericRecord
+   * @return A dynamic message representation of a Proto payload to be used for StorageWrite API
+   */
+  public static DynamicMessage messageFromGenericRecord(
+      Descriptor descriptor, GenericRecord record) {
+    Schema schema = record.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (Schema.Field field : schema.getFields()) {
+      FieldDescriptor fieldDescriptor =
+          Preconditions.checkNotNull(descriptor.findFieldByName(field.name().toLowerCase()));
+      @Nullable
+      Object value =
+          messageValueFromGenericRecordValue(fieldDescriptor, field, field.name(), record);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) {
+    @Nullable Schema schema = field.schema();
+    TableFieldSchema.Builder builder =
+        TableFieldSchema.newBuilder().setName(field.name().toLowerCase());
+    Schema elementType = null;
+    switch (schema.getType()) {
+      case RECORD:
+        if (schema == null) {
+          throw new RuntimeException("Unexpected null schema!");
+        }
+        Preconditions.checkState(!schema.getFields().isEmpty());

Review Comment:
   Are empty records disallowed in avro?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message,
+ * for use with the Storage write API.
+ */
+public class AvroGenericRecordToStorageApiProto {
+
+  static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
+      ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
+          .put(Schema.Type.INT, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FIXED, TableFieldSchema.Type.BYTES)
+          .put(Schema.Type.LONG, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FLOAT, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.DOUBLE, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.STRING, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BOOLEAN, TableFieldSchema.Type.BOOL)
+          .put(Schema.Type.ENUM, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BYTES, TableFieldSchema.Type.BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
+      ImmutableMap.<String, TableFieldSchema.Type>builder()
+          .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
+          .put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC)
+          .put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
+          .build();
+
+  static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
+          .put(Schema.Type.INT, o -> Long.valueOf((int) o))
+          .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes()))
+          .put(Schema.Type.LONG, Functions.identity())
+          .put(
+              Schema.Type.FLOAT,
+              o -> Double.valueOf(Float.valueOf((float) o).toString()).doubleValue())
+          .put(Schema.Type.DOUBLE, Function.identity())
+          .put(Schema.Type.STRING, Function.identity())
+          .put(Schema.Type.BOOLEAN, Function.identity())
+          .put(Schema.Type.ENUM, o -> o.toString())
+          .put(Schema.Type.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType, Object, Object>> LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
+          .put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value))
+          .put(
+              LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal)
+          .put(
+              LogicalTypes.timestampMicros().getName(),
+              (logicalType, value) -> convertTimestamp(value, true))
+          .put(
+              LogicalTypes.timestampMillis().getName(),
+              (logicalType, value) -> convertTimestamp(value, false))
+          .put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value))
+          .build();
+
+  static String convertUUID(Object value) {
+    if (value instanceof UUID) {
+      return ((UUID) value).toString();
+    } else {
+      Preconditions.checkArgument(value instanceof String, "Expecting a value as String type.");
+      UUID.fromString((String) value);
+      return (String) value;
+    }
+  }
+
+  static Long convertTimestamp(Object value, boolean micros) {
+    if (value instanceof ReadableInstant) {
+      return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Long, "Expecting a value as Long type (millis).");
+      return (Long) value;
+    }
+  }
+
+  static Integer convertDate(Object value) {
+    if (value instanceof ReadableInstant) {
+      return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Integer, "Expecting a value as Integer type (days).");
+      return (Integer) value;
+    }
+  }
+
+  static ByteString convertDecimal(LogicalType logicalType, Object value) {
+    ByteBuffer byteBuffer = (ByteBuffer) value;
+    BigDecimal bigDecimal =
+        new Conversions.DecimalConversion()
+            .fromBytes(
+                byteBuffer.duplicate(),
+                Schema.create(Schema.Type.NULL), // dummy schema, not used
+                logicalType);
+    return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
+  }
+
+  /**
+   * Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data
+   * through BigQuery Storage API.
+   *
+   * @param schema An Avro Schema
+   * @return Returns the TableSchema created from the provided Schema
+   */
+  public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) {
+    Preconditions.checkState(!schema.getFields().isEmpty());
+
+    TableSchema.Builder builder = TableSchema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      builder.addFields(fieldDescriptorFromAvroField(field));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Given an Avro {@link GenericRecord} object, returns a protocol-buffer message that can be used
+   * to write data using the BigQuery Storage streaming API.
+   *
+   * @param descriptor The Descriptor for the DynamicMessage result
+   * @param record An Avro GenericRecord
+   * @return A dynamic message representation of a Proto payload to be used for StorageWrite API
+   */
+  public static DynamicMessage messageFromGenericRecord(
+      Descriptor descriptor, GenericRecord record) {
+    Schema schema = record.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (Schema.Field field : schema.getFields()) {
+      FieldDescriptor fieldDescriptor =
+          Preconditions.checkNotNull(descriptor.findFieldByName(field.name().toLowerCase()));
+      @Nullable
+      Object value =
+          messageValueFromGenericRecordValue(fieldDescriptor, field, field.name(), record);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) {
+    @Nullable Schema schema = field.schema();
+    TableFieldSchema.Builder builder =
+        TableFieldSchema.newBuilder().setName(field.name().toLowerCase());
+    Schema elementType = null;
+    switch (schema.getType()) {
+      case RECORD:
+        if (schema == null) {

Review Comment:
   If this were null, the above switch would already have thrown. Did you mean to check something else?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1925,7 +1933,19 @@ public static <T> Write<T> write() {
    * Write#withFormatFunction(SerializableFunction)}.
    */
   public static Write<TableRow> writeTableRows() {
-    return BigQueryIO.<TableRow>write().withFormatFunction(IDENTITY_FORMATTER);
+    return BigQueryIO.<TableRow>write().withFormatFunction(TABLE_ROW_IDENTITY_FORMATTER);
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} containing {@link GenericRecord
+   * GenericRecords} to a BigQuery table.
+   *
+   * <p>It is recommended to instead use {@link #write} with {@link
+   * Write#withFormatFunction(SerializableFunction)}.

Review Comment:
   Why is this recommended?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -540,7 +540,15 @@ public class BigQueryIO {
    * A formatting function that maps a TableRow to itself. This allows sending a {@code
    * PCollection<TableRow>} directly to BigQueryIO.Write.
    */
-  static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = input -> input;
+  static final SerializableFunction<TableRow, TableRow> TABLE_ROW_IDENTITY_FORMATTER =
+      input -> input;
+
+  /**
+   * A formatting function that maps a GenericRecord to itself. This allows sending a {@code
+   * PCollection<GenericRecord>} directly to BigQueryIO.Write.
+   */
+  static final SerializableFunction<AvroWriteRequest<GenericRecord>, GenericRecord>
+      GENERIC_RECORD_IDENTITY_FORMATTER = input -> input.getElement();

Review Comment:
   can you just shorted this to input::getElement?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -540,7 +540,15 @@ public class BigQueryIO {
    * A formatting function that maps a TableRow to itself. This allows sending a {@code
    * PCollection<TableRow>} directly to BigQueryIO.Write.
    */
-  static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = input -> input;
+  static final SerializableFunction<TableRow, TableRow> TABLE_ROW_IDENTITY_FORMATTER =
+      input -> input;

Review Comment:
   = SerializableFunctions.identity()



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -3165,6 +3156,26 @@ private <DestinationT> WriteResult continueExpandTyped(
           storageApiDynamicDestinations =
               new StorageApiDynamicDestinationsBeamRow<>(
                   dynamicDestinations, elementSchema, elementToRowFunction);
+        } else if (getAvroRowWriterFactory() != null) {

Review Comment:
   Yeah - because setSideInputAccessorFromProcessContext needs to be called on the dynamic destinations in every DoFn that might access this, and I'm not sure that's correct for AvroWriterFactory



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
+import com.google.cloud.bigquery.storage.v1.TableSchema;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.DynamicMessage;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.utils.AvroUtils.TypeWithNullability;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Functions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+
+/**
+ * Utility methods for converting Avro {@link GenericRecord} objects to dynamic protocol message,
+ * for use with the Storage write API.
+ */
+public class AvroGenericRecordToStorageApiProto {
+
+  static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
+      ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
+          .put(Schema.Type.INT, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FIXED, TableFieldSchema.Type.BYTES)
+          .put(Schema.Type.LONG, TableFieldSchema.Type.INT64)
+          .put(Schema.Type.FLOAT, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.DOUBLE, TableFieldSchema.Type.DOUBLE)
+          .put(Schema.Type.STRING, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BOOLEAN, TableFieldSchema.Type.BOOL)
+          .put(Schema.Type.ENUM, TableFieldSchema.Type.STRING)
+          .put(Schema.Type.BYTES, TableFieldSchema.Type.BYTES)
+          .build();
+
+  // A map of supported logical types to the protobuf field type.
+  static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
+      ImmutableMap.<String, TableFieldSchema.Type>builder()
+          .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
+          .put(LogicalTypes.decimal(1).getName(), TableFieldSchema.Type.BIGNUMERIC)
+          .put(LogicalTypes.timestampMicros().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.timestampMillis().getName(), TableFieldSchema.Type.TIMESTAMP)
+          .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
+          .build();
+
+  static final Map<Schema.Type, Function<Object, Object>> PRIMITIVE_ENCODERS =
+      ImmutableMap.<Schema.Type, Function<Object, Object>>builder()
+          .put(Schema.Type.INT, o -> Long.valueOf((int) o))
+          .put(Schema.Type.FIXED, o -> ByteString.copyFrom(((GenericData.Fixed) o).bytes()))
+          .put(Schema.Type.LONG, Functions.identity())
+          .put(
+              Schema.Type.FLOAT,
+              o -> Double.valueOf(Float.valueOf((float) o).toString()).doubleValue())
+          .put(Schema.Type.DOUBLE, Function.identity())
+          .put(Schema.Type.STRING, Function.identity())
+          .put(Schema.Type.BOOLEAN, Function.identity())
+          .put(Schema.Type.ENUM, o -> o.toString())
+          .put(Schema.Type.BYTES, o -> ByteString.copyFrom((byte[]) o))
+          .build();
+
+  // A map of supported logical types to their encoding functions.
+  static final Map<String, BiFunction<LogicalType, Object, Object>> LOGICAL_TYPE_ENCODERS =
+      ImmutableMap.<String, BiFunction<LogicalType, Object, Object>>builder()
+          .put(LogicalTypes.date().getName(), (logicalType, value) -> convertDate(value))
+          .put(
+              LogicalTypes.decimal(1).getName(), AvroGenericRecordToStorageApiProto::convertDecimal)
+          .put(
+              LogicalTypes.timestampMicros().getName(),
+              (logicalType, value) -> convertTimestamp(value, true))
+          .put(
+              LogicalTypes.timestampMillis().getName(),
+              (logicalType, value) -> convertTimestamp(value, false))
+          .put(LogicalTypes.uuid().getName(), (logicalType, value) -> convertUUID(value))
+          .build();
+
+  static String convertUUID(Object value) {
+    if (value instanceof UUID) {
+      return ((UUID) value).toString();
+    } else {
+      Preconditions.checkArgument(value instanceof String, "Expecting a value as String type.");
+      UUID.fromString((String) value);
+      return (String) value;
+    }
+  }
+
+  static Long convertTimestamp(Object value, boolean micros) {
+    if (value instanceof ReadableInstant) {
+      return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Long, "Expecting a value as Long type (millis).");
+      return (Long) value;
+    }
+  }
+
+  static Integer convertDate(Object value) {
+    if (value instanceof ReadableInstant) {
+      return Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
+    } else {
+      Preconditions.checkArgument(
+          value instanceof Integer, "Expecting a value as Integer type (days).");
+      return (Integer) value;
+    }
+  }
+
+  static ByteString convertDecimal(LogicalType logicalType, Object value) {
+    ByteBuffer byteBuffer = (ByteBuffer) value;
+    BigDecimal bigDecimal =
+        new Conversions.DecimalConversion()
+            .fromBytes(
+                byteBuffer.duplicate(),
+                Schema.create(Schema.Type.NULL), // dummy schema, not used
+                logicalType);
+    return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
+  }
+
+  /**
+   * Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data
+   * through BigQuery Storage API.
+   *
+   * @param schema An Avro Schema
+   * @return Returns the TableSchema created from the provided Schema
+   */
+  public static TableSchema protoTableSchemaFromAvroSchema(Schema schema) {
+    Preconditions.checkState(!schema.getFields().isEmpty());
+
+    TableSchema.Builder builder = TableSchema.newBuilder();
+    for (Schema.Field field : schema.getFields()) {
+      builder.addFields(fieldDescriptorFromAvroField(field));
+    }
+    return builder.build();
+  }
+
+  /**
+   * Given an Avro {@link GenericRecord} object, returns a protocol-buffer message that can be used
+   * to write data using the BigQuery Storage streaming API.
+   *
+   * @param descriptor The Descriptor for the DynamicMessage result
+   * @param record An Avro GenericRecord
+   * @return A dynamic message representation of a Proto payload to be used for StorageWrite API
+   */
+  public static DynamicMessage messageFromGenericRecord(
+      Descriptor descriptor, GenericRecord record) {
+    Schema schema = record.getSchema();
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (Schema.Field field : schema.getFields()) {
+      FieldDescriptor fieldDescriptor =
+          Preconditions.checkNotNull(descriptor.findFieldByName(field.name().toLowerCase()));
+      @Nullable
+      Object value =
+          messageValueFromGenericRecordValue(fieldDescriptor, field, field.name(), record);
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field field) {
+    @Nullable Schema schema = field.schema();
+    TableFieldSchema.Builder builder =
+        TableFieldSchema.newBuilder().setName(field.name().toLowerCase());
+    Schema elementType = null;
+    switch (schema.getType()) {
+      case RECORD:
+        if (schema == null) {
+          throw new RuntimeException("Unexpected null schema!");
+        }
+        Preconditions.checkState(!schema.getFields().isEmpty());
+        builder = builder.setType(TableFieldSchema.Type.STRUCT);
+        for (Schema.Field recordField : schema.getFields()) {
+          builder = builder.addFields(fieldDescriptorFromAvroField(recordField));
+        }
+        break;
+      case ARRAY:
+        elementType = TypeWithNullability.create(schema.getElementType()).getType();
+        if (elementType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        Preconditions.checkState(
+            elementType.getType() != Schema.Type.ARRAY, "Nested arrays not supported by BigQuery.");
+
+        TableFieldSchema elementFieldSchema =
+            fieldDescriptorFromAvroField(
+                new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()));
+        builder = builder.setType(elementFieldSchema.getType());
+        builder.addAllFields(elementFieldSchema.getFieldsList());
+        builder = builder.setMode(TableFieldSchema.Mode.REPEATED);
+        break;
+      case MAP:
+        Schema keyType = Schema.create(Schema.Type.STRING);
+        Schema valueType = TypeWithNullability.create(schema.getElementType()).getType();
+        if (valueType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        TableFieldSchema keyFieldSchema =
+            fieldDescriptorFromAvroField(
+                new Schema.Field("key", keyType, "key of the map entry", Schema.Field.NULL_VALUE));
+        TableFieldSchema valueFieldSchema =
+            fieldDescriptorFromAvroField(
+                new Schema.Field(
+                    "value", valueType, "value of the map entry", Schema.Field.NULL_VALUE));
+        builder =
+            builder
+                .setType(TableFieldSchema.Type.STRUCT)
+                .addFields(keyFieldSchema)
+                .addFields(valueFieldSchema)
+                .setMode(TableFieldSchema.Mode.REPEATED);
+        break;
+      case UNION:
+        elementType = TypeWithNullability.create(schema).getType();
+        if (elementType == null) {
+          throw new RuntimeException("Unexpected null element type!");
+        }
+        // check to see if more than one non-null type is defined in the union
+        Preconditions.checkState(
+            elementType.getType() != Schema.Type.UNION,
+            "Multiple non-null union types are not supported.");
+        TableFieldSchema unionFieldSchema =
+            fieldDescriptorFromAvroField(
+                new Schema.Field(field.name(), elementType, field.doc(), field.defaultVal()));
+        builder =
+            builder
+                .setType(unionFieldSchema.getType())
+                .addAllFields(unionFieldSchema.getFieldsList());
+        break;
+      default:
+        elementType = TypeWithNullability.create(schema).getType();
+        @Nullable
+        TableFieldSchema.Type primitiveType =
+            Optional.ofNullable(LogicalTypes.fromSchema(elementType))
+                .map(logicalType -> LOGICAL_TYPES.get(logicalType.getName()))
+                .orElse(PRIMITIVE_TYPES.get(elementType.getType()));
+        if (primitiveType == null) {
+          throw new RuntimeException("Unsupported type " + elementType.getType());
+        }
+        // a scalar will be required by default, if defined as part of union then
+        // caller will set nullability requirements
+        builder = builder.setType(primitiveType);
+    }
+    if (builder.getMode() != TableFieldSchema.Mode.REPEATED) {
+      if (TypeWithNullability.create(schema).isNullable()) {
+        builder = builder.setMode(TableFieldSchema.Mode.NULLABLE);
+      } else {
+        builder = builder.setMode(TableFieldSchema.Mode.REQUIRED);
+      }
+    }
+    if (field.doc() != null) {
+      builder = builder.setDescription(field.doc());
+    }
+    return builder.build();
+  }
+
+  @Nullable
+  private static Object messageValueFromGenericRecordValue(
+      FieldDescriptor fieldDescriptor, Schema.Field avroField, String name, GenericRecord record) {
+    @Nullable Object value = record.get(name);
+    if (value == null) {
+      if (fieldDescriptor.isOptional()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field " + fieldDescriptor.getName());
+      }
+    }
+    return toProtoValue(fieldDescriptor, avroField.schema(), value);
+  }
+
+  private static Object toProtoValue(
+      FieldDescriptor fieldDescriptor, Schema avroSchema, Object value) {
+    switch (avroSchema.getType()) {
+      case RECORD:
+        return messageFromGenericRecord(fieldDescriptor.getMessageType(), (GenericRecord) value);
+      case ARRAY:
+        List<Object> list = (List<Object>) value;

Review Comment:
   Are we sure it's always a List (and not just an Iterable)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1040387139


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java:
##########
@@ -265,6 +266,7 @@ public abstract static class Builder {
           .put(SqlTypes.DATETIME.getIdentifier(), StandardSQLTypeName.DATETIME)
           .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
           .put("Enum", StandardSQLTypeName.STRING)
+          .put(MicrosInstant.IDENTIFIER, StandardSQLTypeName.TIMESTAMP)

Review Comment:
   ok, in fact I rebased from master and removed the MicrosInstant mapping and the tests are still running fine. So I will keep the micros support since now those get mapped to Instant and work fine. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on PR #24274:
URL: https://github.com/apache/beam/pull/24274#issuecomment-1334126406

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] prodriguezdefino commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
prodriguezdefino commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1037464135


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Message;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Storage API DynamicDestinations used when the input is a Beam Row. */
+class StorageApiDynamicDestinationsGenericRecord<T, DestinationT extends @NonNull Object>
+    extends StorageApiDynamicDestinations<T, DestinationT> {
+
+  private final SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord;
+  private final SerializableFunction<@Nullable TableSchema, Schema> schemaFactory;
+
+  StorageApiDynamicDestinationsGenericRecord(
+      DynamicDestinations<T, DestinationT> inner,
+      SerializableFunction<@Nullable TableSchema, Schema> schemaFactory,
+      SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord) {
+    super(inner);
+    this.toGenericRecord = toGenericRecord;
+    this.schemaFactory = schemaFactory;
+  }
+
+  @Override
+  public MessageConverter<T> getMessageConverter(
+      DestinationT destination, DatasetService datasetService) throws Exception {
+    return new MessageConverter<T>() {
+      final Descriptor descriptor;
+      final long descriptorHash;
+      final Schema avroSchema;
+      final TableSchema tableSchema;
+
+      {
+        avroSchema = schemaFactory.apply(getSchema(destination));
+        tableSchema = BigQueryUtils.toTableSchema(AvroUtils.toBeamSchema(avroSchema));
+        descriptor = AvroGenericRecordToStorageApiProto.getDescriptorFromSchema(avroSchema);
+        descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on a diff in pull request #24274: Support Avro GenericRecord as a valid format for StorageWrite API on BigQueryIO

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on code in PR #24274:
URL: https://github.com/apache/beam/pull/24274#discussion_r1036590099


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Message;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Storage API DynamicDestinations used when the input is a Beam Row. */
+class StorageApiDynamicDestinationsGenericRecord<T, DestinationT extends @NonNull Object>
+    extends StorageApiDynamicDestinations<T, DestinationT> {
+
+  private final SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord;
+  private final SerializableFunction<@Nullable TableSchema, Schema> schemaFactory;
+
+  StorageApiDynamicDestinationsGenericRecord(
+      DynamicDestinations<T, DestinationT> inner,
+      SerializableFunction<@Nullable TableSchema, Schema> schemaFactory,
+      SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord) {
+    super(inner);
+    this.toGenericRecord = toGenericRecord;
+    this.schemaFactory = schemaFactory;
+  }
+
+  @Override
+  public MessageConverter<T> getMessageConverter(
+      DestinationT destination, DatasetService datasetService) throws Exception {
+    return new MessageConverter<T>() {
+      final Descriptor descriptor;
+      final long descriptorHash;
+      final Schema avroSchema;
+      final TableSchema tableSchema;
+
+      {
+        avroSchema = schemaFactory.apply(getSchema(destination));
+        tableSchema = BigQueryUtils.toTableSchema(AvroUtils.toBeamSchema(avroSchema));
+        descriptor = AvroGenericRecordToStorageApiProto.getDescriptorFromSchema(avroSchema);
+        descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);

Review Comment:
   get rid of descriptorHash - no longer needed



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsGenericRecord.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Message;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Storage API DynamicDestinations used when the input is a Beam Row. */
+class StorageApiDynamicDestinationsGenericRecord<T, DestinationT extends @NonNull Object>
+    extends StorageApiDynamicDestinations<T, DestinationT> {
+
+  private final SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord;
+  private final SerializableFunction<@Nullable TableSchema, Schema> schemaFactory;
+
+  StorageApiDynamicDestinationsGenericRecord(
+      DynamicDestinations<T, DestinationT> inner,
+      SerializableFunction<@Nullable TableSchema, Schema> schemaFactory,
+      SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord) {
+    super(inner);
+    this.toGenericRecord = toGenericRecord;
+    this.schemaFactory = schemaFactory;
+  }
+
+  @Override
+  public MessageConverter<T> getMessageConverter(
+      DestinationT destination, DatasetService datasetService) throws Exception {
+    return new MessageConverter<T>() {

Review Comment:
   Does this even compile now? I think you're going to have to redo this around com.google.cloud.bigquery.storage.v1.TableSchema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org