You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/26 23:08:34 UTC

[GitHub] [beam] pabloem opened a new pull request, #22926: [WIP] Adding support for Beam Schema Rows with BQ DIRECT_READ

pabloem opened a new pull request, #22926:
URL: https://github.com/apache/beam/pull/22926

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #22926:
URL: https://github.com/apache/beam/pull/22926#issuecomment-1235679700

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #22926:
URL: https://github.com/apache/beam/pull/22926#issuecomment-1234744322

   PTAL? : D I would like to have this in snapshots to build and test the Syndeo template that I'm working on


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #22926: [WIP] Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #22926:
URL: https://github.com/apache/beam/pull/22926#issuecomment-1229071066

   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #22926:
URL: https://github.com/apache/beam/pull/22926#discussion_r961293509


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1232,22 +1232,49 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
       return rows;
     }
 
-    private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {
+    private PCollection<T> expandForDirectRead(
+        PBegin input, Coder<T> outputCoder, boolean beamSchemaEnabled) {
       ValueProvider<TableReference> tableProvider = getTableProvider();
       Pipeline p = input.getPipeline();
+      BigQuerySourceDef srcDef = createSourceDef();
       if (tableProvider != null) {
         // No job ID is required. Read directly from BigQuery storage.
-        return p.apply(
-            org.apache.beam.sdk.io.Read.from(
-                BigQueryStorageTableSource.create(
-                    tableProvider,
-                    getFormat(),
-                    getSelectedFields(),
-                    getRowRestriction(),
-                    getParseFn(),
-                    outputCoder,
-                    getBigQueryServices(),
-                    getProjectionPushdownApplied())));
+        PCollection<T> rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    BigQueryStorageTableSource.create(
+                        tableProvider,
+                        getFormat(),
+                        getSelectedFields(),
+                        getRowRestriction(),
+                        getParseFn(),
+                        outputCoder,
+                        getBigQueryServices(),
+                        getProjectionPushdownApplied())));
+        if (beamSchemaEnabled) {
+          BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+          Schema beamSchema = srcDef.getBeamSchema(bqOptions);
+
+          List<Schema.Field> flds =
+              beamSchema.getFields().stream()
+                  .filter(
+                      field -> {
+                        if (getSelectedFields() != null
+                            && getSelectedFields().isAccessible()
+                            && getSelectedFields().get() != null) {
+                          return getSelectedFields().get().contains(field.getName());
+                        } else {
+                          return true;
+                        }
+                      })
+                  .collect(Collectors.toList());
+
+          beamSchema = Schema.builder().addFields(flds).build();
+          SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
+          SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);

Review Comment:
   done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #22926:
URL: https://github.com/apache/beam/pull/22926#discussion_r961136522


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java:
##########
@@ -1481,6 +1482,82 @@ public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception {
     p.run();
   }
 
+  @Test
+  public void testReadFromBigQueryIOWithBeamSchema() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
+    TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    .setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setReadOptions(
+                        ReadSession.TableReadOptions.newBuilder().addSelectedFields("name"))
+                    .setDataFormat(DataFormat.AVRO))
+            .setMaxStreamCount(10)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            .setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+            .addStreams(ReadStream.newBuilder().setName("streamName"))
+            .setDataFormat(DataFormat.AVRO)
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequest =
+        ReadRowsRequest.newBuilder().setReadStream("streamName").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", TRIMMED_AVRO_SCHEMA),
+            createRecord("B", TRIMMED_AVRO_SCHEMA),
+            createRecord("C", TRIMMED_AVRO_SCHEMA),
+            createRecord("D", TRIMMED_AVRO_SCHEMA));
+
+    List<ReadRowsResponse> readRowsResponses =
+        Lists.newArrayList(
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50),
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5, 0.75));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
+
+    PCollection<Row> output =
+        p.apply(
+                BigQueryIO.readTableRowsWithSchema()
+                    .from("foo.com:project:dataset.table")
+                    .withMethod(Method.DIRECT_READ)
+                    .withSelectedFields(Lists.newArrayList("name"))
+                    .withFormat(DataFormat.AVRO)

Review Comment:
   Will this work with Arrow format as well?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java:
##########
@@ -320,7 +320,9 @@ public Job getJob(JobReference jobRef) {
                               "Job %s failed: %s", job.job.getConfiguration(), e.toString())));
           List<ResourceId> sourceFiles =
               filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
-          FileSystems.delete(sourceFiles);

Review Comment:
   You might consider enabling the nullness checker in this file.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1232,22 +1232,49 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
       return rows;
     }
 
-    private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {
+    private PCollection<T> expandForDirectRead(
+        PBegin input, Coder<T> outputCoder, boolean beamSchemaEnabled) {
       ValueProvider<TableReference> tableProvider = getTableProvider();
       Pipeline p = input.getPipeline();
+      BigQuerySourceDef srcDef = createSourceDef();
       if (tableProvider != null) {
         // No job ID is required. Read directly from BigQuery storage.
-        return p.apply(
-            org.apache.beam.sdk.io.Read.from(
-                BigQueryStorageTableSource.create(
-                    tableProvider,
-                    getFormat(),
-                    getSelectedFields(),
-                    getRowRestriction(),
-                    getParseFn(),
-                    outputCoder,
-                    getBigQueryServices(),
-                    getProjectionPushdownApplied())));
+        PCollection<T> rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    BigQueryStorageTableSource.create(
+                        tableProvider,
+                        getFormat(),
+                        getSelectedFields(),
+                        getRowRestriction(),
+                        getParseFn(),
+                        outputCoder,
+                        getBigQueryServices(),
+                        getProjectionPushdownApplied())));
+        if (beamSchemaEnabled) {
+          BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+          Schema beamSchema = srcDef.getBeamSchema(bqOptions);
+
+          List<Schema.Field> flds =
+              beamSchema.getFields().stream()
+                  .filter(
+                      field -> {
+                        if (getSelectedFields() != null
+                            && getSelectedFields().isAccessible()
+                            && getSelectedFields().get() != null) {
+                          return getSelectedFields().get().contains(field.getName());
+                        } else {
+                          return true;
+                        }
+                      })
+                  .collect(Collectors.toList());
+
+          beamSchema = Schema.builder().addFields(flds).build();
+          SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
+          SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);

Review Comment:
   We already have this logic in `BigQueryIO.expand` (only triggered in the non direct read path). I wonder if we can refactor and re-use that path for both paths? Similar to what Svetak did in Python.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java:
##########
@@ -1481,6 +1482,82 @@ public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception {
     p.run();
   }
 
+  @Test
+  public void testReadFromBigQueryIOWithBeamSchema() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
+    TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    .setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setReadOptions(
+                        ReadSession.TableReadOptions.newBuilder().addSelectedFields("name"))
+                    .setDataFormat(DataFormat.AVRO))
+            .setMaxStreamCount(10)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            .setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+            .addStreams(ReadStream.newBuilder().setName("streamName"))
+            .setDataFormat(DataFormat.AVRO)
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequest =
+        ReadRowsRequest.newBuilder().setReadStream("streamName").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", TRIMMED_AVRO_SCHEMA),
+            createRecord("B", TRIMMED_AVRO_SCHEMA),
+            createRecord("C", TRIMMED_AVRO_SCHEMA),
+            createRecord("D", TRIMMED_AVRO_SCHEMA));
+
+    List<ReadRowsResponse> readRowsResponses =
+        Lists.newArrayList(
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50),
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5, 0.75));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
+
+    PCollection<Row> output =
+        p.apply(
+                BigQueryIO.readTableRowsWithSchema()
+                    .from("foo.com:project:dataset.table")
+                    .withMethod(Method.DIRECT_READ)
+                    .withSelectedFields(Lists.newArrayList("name"))
+                    .withFormat(DataFormat.AVRO)
+                    .withTestServices(
+                        new FakeBigQueryServices()
+                            .withDatasetService(fakeDatasetService)
+                            .withStorageClient(fakeStorageClient)))
+            .apply(Convert.toRows());
+
+    org.apache.beam.sdk.schemas.Schema beamSchema =
+        org.apache.beam.sdk.schemas.Schema.of(
+            org.apache.beam.sdk.schemas.Schema.Field.of(
+                "name", org.apache.beam.sdk.schemas.Schema.FieldType.STRING));
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(
+                Row.withSchema(beamSchema).addValue("A").build(),
+                Row.withSchema(beamSchema).addValue("B").build(),
+                Row.withSchema(beamSchema).addValue("C").build(),
+                Row.withSchema(beamSchema).addValue("D").build()));
+
+    p.run();

Review Comment:
   Can you also test with an integration test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #22926:
URL: https://github.com/apache/beam/pull/22926#issuecomment-1235073549

   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem merged pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
pabloem merged PR #22926:
URL: https://github.com/apache/beam/pull/22926


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22926:
URL: https://github.com/apache/beam/pull/22926#issuecomment-1234786127

   > PTAL? : D I would like to have this in snapshots to build and test the Syndeo template that I'm working on
   
   Ah shoot sorry. I will take a look today. One high-level question - I wonder if there's some duplication between this and the BQ TableProvider? It could be nice to have one defer to the other if possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #22926:
URL: https://github.com/apache/beam/pull/22926#issuecomment-1234818890

   Run Java PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #22926: [WIP] Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #22926:
URL: https://github.com/apache/beam/pull/22926#issuecomment-1229073999

   r: @svetakvsundhar 
   r: @TheNeuralBit 
   
   This adds only  schema support for table reads, not for query reads but it may not be difficult to add it for query reads.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] svetakvsundhar commented on a diff in pull request #22926: [WIP] Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
svetakvsundhar commented on code in PR #22926:
URL: https://github.com/apache/beam/pull/22926#discussion_r956625390


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1232,22 +1232,49 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
       return rows;
     }
 
-    private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {
+    private PCollection<T> expandForDirectRead(
+        PBegin input, Coder<T> outputCoder, boolean beamSchemaEnabled) {
       ValueProvider<TableReference> tableProvider = getTableProvider();
       Pipeline p = input.getPipeline();
+      BigQuerySourceDef srcDef = createSourceDef();
       if (tableProvider != null) {
         // No job ID is required. Read directly from BigQuery storage.
-        return p.apply(
-            org.apache.beam.sdk.io.Read.from(
-                BigQueryStorageTableSource.create(
-                    tableProvider,
-                    getFormat(),
-                    getSelectedFields(),
-                    getRowRestriction(),
-                    getParseFn(),
-                    outputCoder,
-                    getBigQueryServices(),
-                    getProjectionPushdownApplied())));
+        PCollection<T> rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    BigQueryStorageTableSource.create(
+                        tableProvider,
+                        getFormat(),
+                        getSelectedFields(),
+                        getRowRestriction(),
+                        getParseFn(),
+                        outputCoder,
+                        getBigQueryServices(),
+                        getProjectionPushdownApplied())));
+        if (beamSchemaEnabled) {
+          BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+          Schema beamSchema = srcDef.getBeamSchema(bqOptions);
+
+          List<Schema.Field> flds =
+              beamSchema.getFields().stream()
+                  .filter(
+                      field -> {
+                        if (getSelectedFields() != null
+                            && getSelectedFields().isAccessible()
+                            && getSelectedFields().get() != null) {
+                          return getSelectedFields().get().contains(field.getName());
+                        } else {
+                          return true;
+                        }
+                      })
+                  .collect(Collectors.toList());
+
+          beamSchema = Schema.builder().addFields(flds).build();
+          SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
+          SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);

Review Comment:
   is the purpose of this here when we wan't to convert something from a `BeamRow` type into another type? (e.g. the WriteToBQ case)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #22926:
URL: https://github.com/apache/beam/pull/22926#discussion_r956758177


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1232,22 +1232,49 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
       return rows;
     }
 
-    private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {
+    private PCollection<T> expandForDirectRead(
+        PBegin input, Coder<T> outputCoder, boolean beamSchemaEnabled) {
       ValueProvider<TableReference> tableProvider = getTableProvider();
       Pipeline p = input.getPipeline();
+      BigQuerySourceDef srcDef = createSourceDef();
       if (tableProvider != null) {
         // No job ID is required. Read directly from BigQuery storage.
-        return p.apply(
-            org.apache.beam.sdk.io.Read.from(
-                BigQueryStorageTableSource.create(
-                    tableProvider,
-                    getFormat(),
-                    getSelectedFields(),
-                    getRowRestriction(),
-                    getParseFn(),
-                    outputCoder,
-                    getBigQueryServices(),
-                    getProjectionPushdownApplied())));
+        PCollection<T> rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    BigQueryStorageTableSource.create(
+                        tableProvider,
+                        getFormat(),
+                        getSelectedFields(),
+                        getRowRestriction(),
+                        getParseFn(),
+                        outputCoder,
+                        getBigQueryServices(),
+                        getProjectionPushdownApplied())));
+        if (beamSchemaEnabled) {
+          BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+          Schema beamSchema = srcDef.getBeamSchema(bqOptions);
+
+          List<Schema.Field> flds =
+              beamSchema.getFields().stream()
+                  .filter(
+                      field -> {
+                        if (getSelectedFields() != null
+                            && getSelectedFields().isAccessible()
+                            && getSelectedFields().get() != null) {
+                          return getSelectedFields().get().contains(field.getName());
+                        } else {
+                          return true;
+                        }
+                      })
+                  .collect(Collectors.toList());
+
+          beamSchema = Schema.builder().addFields(flds).build();
+          SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
+          SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);

Review Comment:
   yes right! It relies on BQUtils to generate a converter function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22926:
URL: https://github.com/apache/beam/pull/22926#issuecomment-1229515371

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @robertwb for label java.
   R: @johnjcasey for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
pabloem commented on PR #22926:
URL: https://github.com/apache/beam/pull/22926#issuecomment-1230710662

   @TheNeuralBit PTAL : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

Posted by GitBox <gi...@apache.org>.
pabloem commented on code in PR #22926:
URL: https://github.com/apache/beam/pull/22926#discussion_r961293340


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java:
##########
@@ -1481,6 +1482,82 @@ public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception {
     p.run();
   }
 
+  @Test
+  public void testReadFromBigQueryIOWithBeamSchema() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
+    TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    .setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setReadOptions(
+                        ReadSession.TableReadOptions.newBuilder().addSelectedFields("name"))
+                    .setDataFormat(DataFormat.AVRO))
+            .setMaxStreamCount(10)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            .setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+            .addStreams(ReadStream.newBuilder().setName("streamName"))
+            .setDataFormat(DataFormat.AVRO)
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequest =
+        ReadRowsRequest.newBuilder().setReadStream("streamName").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", TRIMMED_AVRO_SCHEMA),
+            createRecord("B", TRIMMED_AVRO_SCHEMA),
+            createRecord("C", TRIMMED_AVRO_SCHEMA),
+            createRecord("D", TRIMMED_AVRO_SCHEMA));
+
+    List<ReadRowsResponse> readRowsResponses =
+        Lists.newArrayList(
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50),
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5, 0.75));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
+
+    PCollection<Row> output =
+        p.apply(
+                BigQueryIO.readTableRowsWithSchema()
+                    .from("foo.com:project:dataset.table")
+                    .withMethod(Method.DIRECT_READ)
+                    .withSelectedFields(Lists.newArrayList("name"))
+                    .withFormat(DataFormat.AVRO)
+                    .withTestServices(
+                        new FakeBigQueryServices()
+                            .withDatasetService(fakeDatasetService)
+                            .withStorageClient(fakeStorageClient)))
+            .apply(Convert.toRows());
+
+    org.apache.beam.sdk.schemas.Schema beamSchema =
+        org.apache.beam.sdk.schemas.Schema.of(
+            org.apache.beam.sdk.schemas.Schema.Field.of(
+                "name", org.apache.beam.sdk.schemas.Schema.FieldType.STRING));
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(
+                Row.withSchema(beamSchema).addValue("A").build(),
+                Row.withSchema(beamSchema).addValue("B").build(),
+                Row.withSchema(beamSchema).addValue("C").build(),
+                Row.withSchema(beamSchema).addValue("D").build()));
+
+    p.run();

Review Comment:
   added



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java:
##########
@@ -1481,6 +1482,82 @@ public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception {
     p.run();
   }
 
+  @Test
+  public void testReadFromBigQueryIOWithBeamSchema() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
+    TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    .setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setReadOptions(
+                        ReadSession.TableReadOptions.newBuilder().addSelectedFields("name"))
+                    .setDataFormat(DataFormat.AVRO))
+            .setMaxStreamCount(10)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            .setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+            .addStreams(ReadStream.newBuilder().setName("streamName"))
+            .setDataFormat(DataFormat.AVRO)
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequest =
+        ReadRowsRequest.newBuilder().setReadStream("streamName").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", TRIMMED_AVRO_SCHEMA),
+            createRecord("B", TRIMMED_AVRO_SCHEMA),
+            createRecord("C", TRIMMED_AVRO_SCHEMA),
+            createRecord("D", TRIMMED_AVRO_SCHEMA));
+
+    List<ReadRowsResponse> readRowsResponses =
+        Lists.newArrayList(
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50),
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5, 0.75));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
+
+    PCollection<Row> output =
+        p.apply(
+                BigQueryIO.readTableRowsWithSchema()
+                    .from("foo.com:project:dataset.table")
+                    .withMethod(Method.DIRECT_READ)
+                    .withSelectedFields(Lists.newArrayList("name"))
+                    .withFormat(DataFormat.AVRO)

Review Comment:
   yes, it does (I've tested it on the syndeo template... and I've added an integration test)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org