You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/12/19 18:46:52 UTC
[beam] branch master updated: Fix BigQueryIO.Write.Method.STORAGE_WRITE_API missing project id (#24634)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 944c7b6104e Fix BigQueryIO.Write.Method.STORAGE_WRITE_API missing project id (#24634)
944c7b6104e is described below
commit 944c7b6104e01b7d974a5720098d816979c63237
Author: gabihodoroaga <ga...@gmail.com>
AuthorDate: Mon Dec 19 20:46:38 2022 +0200
Fix BigQueryIO.Write.Method.STORAGE_WRITE_API missing project id (#24634)
* Fix BigQueryIO.Write.Method.STORAGE_WRITE_API missing project id
* Fix Checkstyle errors
* Add unit test
---
.../bigquery/StorageApiWriteUnshardedRecords.java | 2 +-
.../bigquery/StorageApiWritesShardedRecords.java | 2 +-
.../beam/sdk/io/gcp/bigquery/TableDestination.java | 9 +++++++-
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 25 ++++++++++++++++++++++
4 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 8a194ed69c1..9845aba248c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -610,7 +610,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
try {
messageConverter = messageConverters.get(destination, dynamicDestinations, datasetService);
return new DestinationState(
- tableDestination1.getTableUrn(),
+ tableDestination1.getTableUrn(bigQueryOptions),
messageConverter,
datasetService,
useDefaultStream,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 52496fd1cfd..df7a23b68b0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -380,7 +380,7 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
dest);
return tableDestination1;
});
- final String tableId = tableDestination.getTableUrn();
+ final String tableId = tableDestination.getTableUrn(bigQueryOptions);
final DatasetService datasetService = getDatasetService(pipelineOptions);
Supplier<String> getOrCreateStream =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index ef4e94b6b7e..838d1e4a470 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TimePartitioning;
import java.io.Serializable;
import java.util.Objects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;
/** Encapsulates a BigQuery table destination. */
@@ -122,8 +123,14 @@ public class TableDestination implements Serializable {
}
/** Return the tablespec in projects/[project]/datasets/[dataset]/tables/[table] format. */
- public String getTableUrn() {
+ public String getTableUrn(BigQueryOptions bigQueryOptions) {
TableReference table = getTableReference();
+ if (Strings.isNullOrEmpty(table.getProjectId())) {
+ table.setProjectId(
+ bigQueryOptions.getBigQueryProject() == null
+ ? bigQueryOptions.getProject()
+ : bigQueryOptions.getBigQueryProject());
+ }
return String.format(
"projects/%s/datasets/%s/tables/%s",
table.getProjectId(), table.getDatasetId(), table.getTableId());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 9cacfde54ad..c10696db574 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -2629,4 +2629,29 @@ public class BigQueryIOWriteTest implements Serializable {
thrown.expectMessage("SchemaUpdateOptions are not supported when method == STREAMING_INSERTS");
schemaUpdateOptionsTest(BigQueryIO.Write.Method.STREAMING_INSERTS, options);
}
+
+ @Test
+ public void testWriteWithStorageApiWithDefaultProject() throws Exception {
+ assumeTrue(useStorageApi);
+ BigQueryIO.Write<TableRow> write =
+ BigQueryIO.writeTableRows()
+ .to("dataset-id.table-id")
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(new TableFieldSchema().setName("name").setType("STRING"))))
+ .withMethod(Method.STORAGE_WRITE_API)
+ .withoutValidation()
+ .withTestServices(fakeBqServices);
+
+ p.apply(
+ Create.of(new TableRow().set("name", "a"), new TableRow().set("name", "b"))
+ .withCoder(TableRowJsonCoder.of()))
+ .apply("WriteToBQ", write);
+ p.run();
+ assertThat(
+ fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ containsInAnyOrder(new TableRow().set("name", "a"), new TableRow().set("name", "b")));
+ }
}