You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/12/19 18:46:52 UTC

[beam] branch master updated: Fix BigQueryIO.Write.Method.STORAGE_WRITE_API missing project id (#24634)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 944c7b6104e Fix BigQueryIO.Write.Method.STORAGE_WRITE_API missing project id (#24634)
944c7b6104e is described below

commit 944c7b6104e01b7d974a5720098d816979c63237
Author: gabihodoroaga <ga...@gmail.com>
AuthorDate: Mon Dec 19 20:46:38 2022 +0200

    Fix BigQueryIO.Write.Method.STORAGE_WRITE_API missing project id (#24634)
    
    * Fix BigQueryIO.Write.Method.STORAGE_WRITE_API missing project id
    
    * Fix Checkstyle errors
    
    * Add unit test
---
 .../bigquery/StorageApiWriteUnshardedRecords.java  |  2 +-
 .../bigquery/StorageApiWritesShardedRecords.java   |  2 +-
 .../beam/sdk/io/gcp/bigquery/TableDestination.java |  9 +++++++-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 25 ++++++++++++++++++++++
 4 files changed, 35 insertions(+), 3 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 8a194ed69c1..9845aba248c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -610,7 +610,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       try {
         messageConverter = messageConverters.get(destination, dynamicDestinations, datasetService);
         return new DestinationState(
-            tableDestination1.getTableUrn(),
+            tableDestination1.getTableUrn(bigQueryOptions),
             messageConverter,
             datasetService,
             useDefaultStream,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 52496fd1cfd..df7a23b68b0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -380,7 +380,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
                     dest);
                 return tableDestination1;
               });
-      final String tableId = tableDestination.getTableUrn();
+      final String tableId = tableDestination.getTableUrn(bigQueryOptions);
       final DatasetService datasetService = getDatasetService(pipelineOptions);
 
       Supplier<String> getOrCreateStream =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index ef4e94b6b7e..838d1e4a470 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import java.io.Serializable;
 import java.util.Objects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Encapsulates a BigQuery table destination. */
@@ -122,8 +123,14 @@ public class TableDestination implements Serializable {
   }
 
   /** Return the tablespec in projects/[project]/datasets/[dataset]/tables/[table] format. */
-  public String getTableUrn() {
+  public String getTableUrn(BigQueryOptions bigQueryOptions) {
     TableReference table = getTableReference();
+    if (Strings.isNullOrEmpty(table.getProjectId())) {
+      table.setProjectId(
+          bigQueryOptions.getBigQueryProject() == null
+              ? bigQueryOptions.getProject()
+              : bigQueryOptions.getBigQueryProject());
+    }
     return String.format(
         "projects/%s/datasets/%s/tables/%s",
         table.getProjectId(), table.getDatasetId(), table.getTableId());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 9cacfde54ad..c10696db574 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -2629,4 +2629,29 @@ public class BigQueryIOWriteTest implements Serializable {
     thrown.expectMessage("SchemaUpdateOptions are not supported when method == STREAMING_INSERTS");
     schemaUpdateOptionsTest(BigQueryIO.Write.Method.STREAMING_INSERTS, options);
   }
+
+  @Test
+  public void testWriteWithStorageApiWithDefaultProject() throws Exception {
+    assumeTrue(useStorageApi);
+    BigQueryIO.Write<TableRow> write =
+        BigQueryIO.writeTableRows()
+            .to("dataset-id.table-id")
+            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+            .withSchema(
+                new TableSchema()
+                    .setFields(
+                        ImmutableList.of(new TableFieldSchema().setName("name").setType("STRING"))))
+            .withMethod(Method.STORAGE_WRITE_API)
+            .withoutValidation()
+            .withTestServices(fakeBqServices);
+
+    p.apply(
+            Create.of(new TableRow().set("name", "a"), new TableRow().set("name", "b"))
+                .withCoder(TableRowJsonCoder.of()))
+        .apply("WriteToBQ", write);
+    p.run();
+    assertThat(
+        fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+        containsInAnyOrder(new TableRow().set("name", "a"), new TableRow().set("name", "b")));
+  }
 }