You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/01 22:06:25 UTC

[GitHub] [beam] TheNeuralBit commented on a diff in pull request #22926: Adding support for Beam Schema Rows with BQ DIRECT_READ

TheNeuralBit commented on code in PR #22926:
URL: https://github.com/apache/beam/pull/22926#discussion_r961136522


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java:
##########
@@ -1481,6 +1482,82 @@ public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception {
     p.run();
   }
 
+  @Test
+  public void testReadFromBigQueryIOWithBeamSchema() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
+    TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    .setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setReadOptions(
+                        ReadSession.TableReadOptions.newBuilder().addSelectedFields("name"))
+                    .setDataFormat(DataFormat.AVRO))
+            .setMaxStreamCount(10)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            .setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+            .addStreams(ReadStream.newBuilder().setName("streamName"))
+            .setDataFormat(DataFormat.AVRO)
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequest =
+        ReadRowsRequest.newBuilder().setReadStream("streamName").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", TRIMMED_AVRO_SCHEMA),
+            createRecord("B", TRIMMED_AVRO_SCHEMA),
+            createRecord("C", TRIMMED_AVRO_SCHEMA),
+            createRecord("D", TRIMMED_AVRO_SCHEMA));
+
+    List<ReadRowsResponse> readRowsResponses =
+        Lists.newArrayList(
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50),
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5, 0.75));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
+
+    PCollection<Row> output =
+        p.apply(
+                BigQueryIO.readTableRowsWithSchema()
+                    .from("foo.com:project:dataset.table")
+                    .withMethod(Method.DIRECT_READ)
+                    .withSelectedFields(Lists.newArrayList("name"))
+                    .withFormat(DataFormat.AVRO)

Review Comment:
   Will this work with Arrow format as well?



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeJobService.java:
##########
@@ -320,7 +320,9 @@ public Job getJob(JobReference jobRef) {
                               "Job %s failed: %s", job.job.getConfiguration(), e.toString())));
           List<ResourceId> sourceFiles =
               filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
-          FileSystems.delete(sourceFiles);

Review Comment:
   You might consider enabling the nullness checker in this file.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1232,22 +1232,49 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
       return rows;
     }
 
-    private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {
+    private PCollection<T> expandForDirectRead(
+        PBegin input, Coder<T> outputCoder, boolean beamSchemaEnabled) {
       ValueProvider<TableReference> tableProvider = getTableProvider();
       Pipeline p = input.getPipeline();
+      BigQuerySourceDef srcDef = createSourceDef();
       if (tableProvider != null) {
         // No job ID is required. Read directly from BigQuery storage.
-        return p.apply(
-            org.apache.beam.sdk.io.Read.from(
-                BigQueryStorageTableSource.create(
-                    tableProvider,
-                    getFormat(),
-                    getSelectedFields(),
-                    getRowRestriction(),
-                    getParseFn(),
-                    outputCoder,
-                    getBigQueryServices(),
-                    getProjectionPushdownApplied())));
+        PCollection<T> rows =
+            p.apply(
+                org.apache.beam.sdk.io.Read.from(
+                    BigQueryStorageTableSource.create(
+                        tableProvider,
+                        getFormat(),
+                        getSelectedFields(),
+                        getRowRestriction(),
+                        getParseFn(),
+                        outputCoder,
+                        getBigQueryServices(),
+                        getProjectionPushdownApplied())));
+        if (beamSchemaEnabled) {
+          BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+          Schema beamSchema = srcDef.getBeamSchema(bqOptions);
+
+          List<Schema.Field> flds =
+              beamSchema.getFields().stream()
+                  .filter(
+                      field -> {
+                        if (getSelectedFields() != null
+                            && getSelectedFields().isAccessible()
+                            && getSelectedFields().get() != null) {
+                          return getSelectedFields().get().contains(field.getName());
+                        } else {
+                          return true;
+                        }
+                      })
+                  .collect(Collectors.toList());
+
+          beamSchema = Schema.builder().addFields(flds).build();
+          SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
+          SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);

Review Comment:
   We already have this logic in `BigQueryIO.expand` (only triggered in the non direct read path). I wonder if we can refactor and re-use that path for both paths? Similar to what Svetak did in Python.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java:
##########
@@ -1481,6 +1482,82 @@ public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception {
     p.run();
   }
 
+  @Test
+  public void testReadFromBigQueryIOWithBeamSchema() throws Exception {
+    fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
+    TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
+    Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA);
+    fakeDatasetService.createTable(table);
+
+    CreateReadSessionRequest expectedCreateReadSessionRequest =
+        CreateReadSessionRequest.newBuilder()
+            .setParent("projects/project-id")
+            .setReadSession(
+                ReadSession.newBuilder()
+                    .setTable("projects/foo.com:project/datasets/dataset/tables/table")
+                    .setReadOptions(
+                        ReadSession.TableReadOptions.newBuilder().addSelectedFields("name"))
+                    .setDataFormat(DataFormat.AVRO))
+            .setMaxStreamCount(10)
+            .build();
+
+    ReadSession readSession =
+        ReadSession.newBuilder()
+            .setName("readSessionName")
+            .setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING))
+            .addStreams(ReadStream.newBuilder().setName("streamName"))
+            .setDataFormat(DataFormat.AVRO)
+            .build();
+
+    ReadRowsRequest expectedReadRowsRequest =
+        ReadRowsRequest.newBuilder().setReadStream("streamName").build();
+
+    List<GenericRecord> records =
+        Lists.newArrayList(
+            createRecord("A", TRIMMED_AVRO_SCHEMA),
+            createRecord("B", TRIMMED_AVRO_SCHEMA),
+            createRecord("C", TRIMMED_AVRO_SCHEMA),
+            createRecord("D", TRIMMED_AVRO_SCHEMA));
+
+    List<ReadRowsResponse> readRowsResponses =
+        Lists.newArrayList(
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50),
+            createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5, 0.75));
+
+    StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
+    when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
+        .thenReturn(readSession);
+    when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
+        .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
+
+    PCollection<Row> output =
+        p.apply(
+                BigQueryIO.readTableRowsWithSchema()
+                    .from("foo.com:project:dataset.table")
+                    .withMethod(Method.DIRECT_READ)
+                    .withSelectedFields(Lists.newArrayList("name"))
+                    .withFormat(DataFormat.AVRO)
+                    .withTestServices(
+                        new FakeBigQueryServices()
+                            .withDatasetService(fakeDatasetService)
+                            .withStorageClient(fakeStorageClient)))
+            .apply(Convert.toRows());
+
+    org.apache.beam.sdk.schemas.Schema beamSchema =
+        org.apache.beam.sdk.schemas.Schema.of(
+            org.apache.beam.sdk.schemas.Schema.Field.of(
+                "name", org.apache.beam.sdk.schemas.Schema.FieldType.STRING));
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(
+                Row.withSchema(beamSchema).addValue("A").build(),
+                Row.withSchema(beamSchema).addValue("B").build(),
+                Row.withSchema(beamSchema).addValue("C").build(),
+                Row.withSchema(beamSchema).addValue("D").build()));
+
+    p.run();

Review Comment:
   Can you also test with an integration test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org