You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/13 05:28:47 UTC

[GitHub] [beam] kkdoon opened a new pull request, #22718: Support custom avro DatumReader when reading from BigQuery #22717

kkdoon opened a new pull request, #22718:
URL: https://github.com/apache/beam/pull/22718

   Similar to [11479](https://github.com/apache/beam/pull/11479) this PR adds functionality to directly deserialize Avro records to the target class, via the user specified DatumReader, in BigQueryIO.
   
   R: @pabloem @chamikaramj @steveniemitz 
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1227778753

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r946906252


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -804,6 +827,10 @@ public enum Method {
 
       abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> parseFn);
 
+      abstract Builder<T> setReaderDatumFactory(AvroSource.DatumReaderFactory<T> factory);

Review Comment:
   I'm not sure I'm a big fan of using things from avro IO here (DatumReaderFactory), particularly since this is a very simple interface, it might be better to just redefine it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz merged pull request #22718: Support custom avro DatumReader when reading from BigQuery

Posted by GitBox <gi...@apache.org>.
steveniemitz merged PR #22718:
URL: https://github.com/apache/beam/pull/22718


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1265440866

   Run Java_Examples_Dataflow_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r984824334


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -573,6 +577,60 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
             BigQueryUtils.tableRowFromBeamRow());
   }
 
+  @VisibleForTesting
+  static class GenericDatumTransformer<T> implements DatumReader<T> {
+    private final SerializableFunction<SchemaAndRecord, T> parseFn;
+    private final TableSchema tableSchema;
+    private final GenericDatumReader<T> reader;
+    private org.apache.avro.Schema writerSchema;
+
+    public GenericDatumTransformer(
+        SerializableFunction<SchemaAndRecord, T> parseFn,
+        TableSchema tableSchema,
+        org.apache.avro.Schema writer,
+        org.apache.avro.Schema reader) {
+      this.parseFn = parseFn;
+      this.tableSchema = tableSchema;
+      this.setSchema(writer);
+      this.reader = new GenericDatumReader<>(this.writerSchema, reader);
+    }
+
+    @Override
+    public void setSchema(org.apache.avro.Schema schema) {
+      this.writerSchema = schema;
+    }
+
+    @Override
+    public T read(T reuse, Decoder in) throws IOException {
+      GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
+      return parseFn.apply(new SchemaAndRecord(record, tableSchema));
+    }
+  }
+
+  @VisibleForTesting
+  private static class DatumReaderWrapper<T> implements DatumReader<T> {
+    private final DatumReader<T> reader;
+    private org.apache.avro.Schema writerSchema;
+
+    public DatumReaderWrapper(
+        org.apache.avro.Schema writerSchema,
+        org.apache.avro.Schema readerSchema,
+        AvroSource.DatumReaderFactory<T> factory) {
+      this.setSchema(writerSchema);
+      this.reader = factory.apply(this.writerSchema, readerSchema);
+    }
+
+    @Override
+    public void setSchema(org.apache.avro.Schema schema) {
+      this.writerSchema = schema;

Review Comment:
   this should recreate the reader also, otherwise the schema change doesn't do anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r983721125


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -573,6 +576,29 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
             BigQueryUtils.tableRowFromBeamRow());
   }
 
+  @VisibleForTesting
+  static class GenericDatumTransformer<T> extends GenericDatumReader<T> {

Review Comment:
   can this extend `DatumReader` rather than `GenericDatumReader`?  You're not actually using any of the GenericDatumReader functionality here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon closed pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
kkdoon closed pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717
URL: https://github.com/apache/beam/pull/22718


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1262502086

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r946906935


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -606,6 +607,28 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
         .build();
   }
 
+  /**
+   * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
+   * each row of the table or query result, where the custom {@link org.apache.avro.io.DatumReader}
+   * implementation is used to parse from the BigQuery AVRO format.
+   *
+   * <p> This API allows direct deserialization of AVRO data to the target class.
+   */
+  public static <T> TypedRead<T> readWithDatumReader(

Review Comment:
   rather than a new method, can we just add `withDatumReaderFactory` on TypedRead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1265577206

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1261924424

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22718?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22718](https://codecov.io/gh/apache/beam/pull/22718?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1342a7f) into [master](https://codecov.io/gh/apache/beam/commit/b59df6c7614f3234d5140892a108779aaa807e79?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b59df6c) will **increase** coverage by `9.96%`.
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head 1342a7f differs from pull request most recent head 246a3aa. Consider uploading reports for the commit 246a3aa to get more accurate results
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22718      +/-   ##
   ==========================================
   + Coverage   73.43%   83.40%   +9.96%     
   ==========================================
     Files         718      480     -238     
     Lines       95680    66483   -29197     
   ==========================================
   - Hits        70266    55447   -14819     
   + Misses      24103    11036   -13067     
   + Partials     1311        0    -1311     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `?` | |
   | python | `83.40% <ø> (+0.19%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22718?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ython/apache\_beam/io/gcp/experimental/spannerio.py](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2V4cGVyaW1lbnRhbC9zcGFubmVyaW8ucHk=) | `82.15% <0.00%> (-4.79%)` | :arrow_down: |
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/statecache.py](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc3RhdGVjYWNoZS5weQ==) | `89.69% <0.00%> (-2.18%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.06% <0.00%> (-1.71%)` | :arrow_down: |
   | [...ache\_beam/runners/interactive/pipeline\_fragment.py](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9waXBlbGluZV9mcmFnbWVudC5weQ==) | `98.33% <0.00%> (-0.86%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/localfilesystem.py](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vbG9jYWxmaWxlc3lzdGVtLnB5) | `90.97% <0.00%> (-0.76%)` | :arrow_down: |
   | [...python/apache\_beam/runners/worker/worker\_status.py](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvd29ya2VyX3N0YXR1cy5weQ==) | `75.33% <0.00%> (-0.67%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.44% <0.00%> (-0.65%)` | :arrow_down: |
   | [sdks/python/apache\_beam/ml/inference/base.py](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL2Jhc2UucHk=) | `95.58% <0.00%> (-0.26%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.17% <0.00%> (-0.25%)` | :arrow_down: |
   | ... and [252 more](https://codecov.io/gh/apache/beam/pull/22718/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r946906252


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -804,6 +827,10 @@ public enum Method {
 
       abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> parseFn);
 
+      abstract Builder<T> setReaderDatumFactory(AvroSource.DatumReaderFactory<T> factory);

Review Comment:
   I'm not sure I'm a big fan of using things from avro IO here, particularly since this is a very simple interface, it might be better to just redefine it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
kkdoon commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r984923030


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -573,6 +577,60 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
             BigQueryUtils.tableRowFromBeamRow());
   }
 
+  @VisibleForTesting
+  static class GenericDatumTransformer<T> implements DatumReader<T> {
+    private final SerializableFunction<SchemaAndRecord, T> parseFn;
+    private final TableSchema tableSchema;
+    private final GenericDatumReader<T> reader;
+    private org.apache.avro.Schema writerSchema;
+
+    public GenericDatumTransformer(
+        SerializableFunction<SchemaAndRecord, T> parseFn,
+        TableSchema tableSchema,
+        org.apache.avro.Schema writer,
+        org.apache.avro.Schema reader) {
+      this.parseFn = parseFn;
+      this.tableSchema = tableSchema;
+      this.setSchema(writer);
+      this.reader = new GenericDatumReader<>(this.writerSchema, reader);
+    }
+
+    @Override
+    public void setSchema(org.apache.avro.Schema schema) {
+      this.writerSchema = schema;
+    }
+
+    @Override
+    public T read(T reuse, Decoder in) throws IOException {
+      GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
+      return parseFn.apply(new SchemaAndRecord(record, tableSchema));
+    }
+  }
+
+  @VisibleForTesting
+  private static class DatumReaderWrapper<T> implements DatumReader<T> {
+    private final DatumReader<T> reader;
+    private org.apache.avro.Schema writerSchema;
+
+    public DatumReaderWrapper(
+        org.apache.avro.Schema writerSchema,
+        org.apache.avro.Schema readerSchema,
+        AvroSource.DatumReaderFactory<T> factory) {
+      this.setSchema(writerSchema);
+      this.reader = factory.apply(this.writerSchema, readerSchema);
+    }
+
+    @Override
+    public void setSchema(org.apache.avro.Schema schema) {
+      this.writerSchema = schema;

Review Comment:
   good catch



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -573,6 +577,60 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
             BigQueryUtils.tableRowFromBeamRow());
   }
 
+  @VisibleForTesting
+  static class GenericDatumTransformer<T> implements DatumReader<T> {
+    private final SerializableFunction<SchemaAndRecord, T> parseFn;
+    private final TableSchema tableSchema;
+    private final GenericDatumReader<T> reader;
+    private org.apache.avro.Schema writerSchema;
+
+    public GenericDatumTransformer(
+        SerializableFunction<SchemaAndRecord, T> parseFn,
+        TableSchema tableSchema,
+        org.apache.avro.Schema writer,
+        org.apache.avro.Schema reader) {
+      this.parseFn = parseFn;
+      this.tableSchema = tableSchema;
+      this.setSchema(writer);
+      this.reader = new GenericDatumReader<>(this.writerSchema, reader);
+    }
+
+    @Override
+    public void setSchema(org.apache.avro.Schema schema) {
+      this.writerSchema = schema;
+    }
+
+    @Override
+    public T read(T reuse, Decoder in) throws IOException {
+      GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
+      return parseFn.apply(new SchemaAndRecord(record, tableSchema));
+    }
+  }
+
+  @VisibleForTesting
+  private static class DatumReaderWrapper<T> implements DatumReader<T> {
+    private final DatumReader<T> reader;
+    private org.apache.avro.Schema writerSchema;
+
+    public DatumReaderWrapper(
+        org.apache.avro.Schema writerSchema,
+        org.apache.avro.Schema readerSchema,
+        AvroSource.DatumReaderFactory<T> factory) {
+      this.setSchema(writerSchema);
+      this.reader = factory.apply(this.writerSchema, readerSchema);
+    }
+
+    @Override
+    public void setSchema(org.apache.avro.Schema schema) {
+      this.writerSchema = schema;

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r983723130


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -598,7 +624,44 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
         .setValidate(true)
         .setWithTemplateCompatibility(false)
         .setBigQueryServices(new BigQueryServicesImpl())
-        .setParseFn(parseFn)
+        .setDatumReaderFactory(
+            (SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>)
+                input ->
+                    (AvroSource.DatumReaderFactory<T>)
+                        (writer, reader) ->
+                            new GenericDatumTransformer<>(parseFn, input, writer, reader))
+        .setParseFn(
+            parseFn) // TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed.

Review Comment:
   you'll want to run spotlessJavaApply on everything here, the precommit is complaining too about it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1265440168

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r947036679


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -606,6 +607,28 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
         .build();
   }
 
+  /**
+   * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
+   * each row of the table or query result, where the custom {@link org.apache.avro.io.DatumReader}
+   * implementation is used to parse from the BigQuery AVRO format.
+   *
+   * <p> This API allows direct deserialization of AVRO data to the target class.
+   */
+  public static <T> TypedRead<T> readWithDatumReader(

Review Comment:
   oh hmm good call, maybe we can overload `read` then instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r946916363


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -606,6 +607,28 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
         .build();
   }
 
+  /**
+   * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
+   * each row of the table or query result, where the custom {@link org.apache.avro.io.DatumReader}
+   * implementation is used to parse from the BigQuery AVRO format.
+   *
+   * <p> This API allows direct deserialization of AVRO data to the target class.
+   */
+  public static <T> TypedRead<T> readWithDatumReader(
+      AvroSource.DatumReaderFactory<T> factory, org.apache.avro.Schema readerSchema) {

Review Comment:
   can we make the reader schema optional, and use the writer schema if its not set?  Thats how AvroIO works IIRC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
kennknowles commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1227764128

   R: @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r984824672


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -573,6 +577,60 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
             BigQueryUtils.tableRowFromBeamRow());
   }
 
+  @VisibleForTesting
+  static class GenericDatumTransformer<T> implements DatumReader<T> {
+    private final SerializableFunction<SchemaAndRecord, T> parseFn;
+    private final TableSchema tableSchema;
+    private final GenericDatumReader<T> reader;
+    private org.apache.avro.Schema writerSchema;
+
+    public GenericDatumTransformer(
+        SerializableFunction<SchemaAndRecord, T> parseFn,
+        TableSchema tableSchema,
+        org.apache.avro.Schema writer,
+        org.apache.avro.Schema reader) {
+      this.parseFn = parseFn;
+      this.tableSchema = tableSchema;
+      this.setSchema(writer);
+      this.reader = new GenericDatumReader<>(this.writerSchema, reader);
+    }
+
+    @Override
+    public void setSchema(org.apache.avro.Schema schema) {
+      this.writerSchema = schema;

Review Comment:
   same idea here w/ recreating the reader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1265440596

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
kkdoon commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1216910974

   > BigQueryStorageAvroReader
   
   yeah, i was thinking of tackling that in another review. I could add in this one too to allow this for different read types.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r946904496


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -804,6 +827,10 @@ public enum Method {
 
       abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> parseFn);
 
+      abstract Builder<T> setReaderDatumFactory(AvroSource.DatumReaderFactory<T> factory);

Review Comment:
   this should be named `setDatumReaderFactory` no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1216781588

   can we plumb this down into BigQueryStorageAvroReader as well so it'll work with direct reads?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1213789130

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @kennknowles for label java.
   R: @chamikaramj for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery

Posted by GitBox <gi...@apache.org>.
ahmedabu98 commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1275105750

   I believe this PR broke Java Dataflow PostCommits: https://ci-beam.apache.org/job/beam_PostCommit_Java_DataflowV1/2173/testReport/. See #23541 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1268959803

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
kkdoon commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r947034447


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -606,6 +607,28 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
         .build();
   }
 
+  /**
+   * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
+   * each row of the table or query result, where the custom {@link org.apache.avro.io.DatumReader}
+   * implementation is used to parse from the BigQuery AVRO format.
+   *
+   * <p> This API allows direct deserialization of AVRO data to the target class.
+   */
+  public static <T> TypedRead<T> readWithDatumReader(

Review Comment:
   how would we get the TypedRead in that case? were you thinking something like:
   ```
    BigQueryIO.readTableRows()
               .withDatumReaderFactory((AvroSource.DatumReaderFactory<User>) (writer, reader) -> new SpecificDatumReader<>(reader))
               .withReaderSchema(User.getAvroSchema())
   ``` 
   ? (this doesn't work though)



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -606,6 +607,28 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
         .build();
   }
 
+  /**
+   * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
+   * each row of the table or query result, where the custom {@link org.apache.avro.io.DatumReader}
+   * implementation is used to parse from the BigQuery AVRO format.
+   *
+   * <p> This API allows direct deserialization of AVRO data to the target class.
+   */
+  public static <T> TypedRead<T> readWithDatumReader(
+      AvroSource.DatumReaderFactory<T> factory, org.apache.avro.Schema readerSchema) {

Review Comment:
   It seems like AvroIO derives it in case the class is of SpecificData subType. We could do the same but we would need to take the class type as input instead (don't think its cleaner though). 
   
   Also, readerSchema is needed by AvroSource in general, as it does validation based on that, if parserFn is not set, 
    as well as derives the AvroCoder based on the readerSchema. Since it gets the writer schema during runtime from the file metadata, we cannot assign reader schema as writer schema on the submitter side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on a diff in pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
kkdoon commented on code in PR #22718:
URL: https://github.com/apache/beam/pull/22718#discussion_r947035168


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -804,6 +827,10 @@ public enum Method {
 
       abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> parseFn);
 
+      abstract Builder<T> setReaderDatumFactory(AvroSource.DatumReaderFactory<T> factory);

Review Comment:
   oh yeah



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1216944694

   > > BigQueryStorageAvroReader
   > 
   > yeah, i was thinking of tackling that in another review. I could add in this one too to allow this for different read types.
   
   that's cool, you can do it in another review, just curious.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1225641226

   Reminder, please take a look at this pr: @kennknowles @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] steveniemitz commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
steveniemitz commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1265441118

   Run Java_Examples_Dataflow_Java17 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kkdoon commented on pull request #22718: Support custom avro DatumReader when reading from BigQuery #22717

Posted by GitBox <gi...@apache.org>.
kkdoon commented on PR #22718:
URL: https://github.com/apache/beam/pull/22718#issuecomment-1270235457

   I ran and profiled a few dataflow jobs with and without my changes to test for performance regression. Overall the profiling data looks good for these new changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org