You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2023/06/02 15:32:26 UTC
[beam] branch master updated: add back deleted line + test (#26976)
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 533ceba1baf add back deleted line + test (#26976)
533ceba1baf is described below
commit 533ceba1bafaf281a37733ed7b129c55476f6a4b
Author: Reuven Lax <re...@google.com>
AuthorDate: Fri Jun 2 08:32:19 2023 -0700
add back deleted line + test (#26976)
---
.../bigquery/StorageApiWriteUnshardedRecords.java | 30 +++++++-------
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 46 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 14 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 9cf5b026039..1531c679f18 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -322,20 +322,22 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
}
String getOrCreateStreamName() throws Exception {
- CreateTableHelpers.createTableWrapper(
- () -> {
- if (!useDefaultStream) {
- this.streamName =
- Preconditions.checkStateNotNull(maybeDatasetService)
- .createWriteStream(tableUrn, Type.PENDING)
- .getName();
- this.currentOffset = 0;
- } else {
- this.streamName = getDefaultStreamName();
- }
- return null;
- },
- tryCreateTable);
+ if (Strings.isNullOrEmpty(this.streamName)) {
+ CreateTableHelpers.createTableWrapper(
+ () -> {
+ if (!useDefaultStream) {
+ this.streamName =
+ Preconditions.checkStateNotNull(maybeDatasetService)
+ .createWriteStream(tableUrn, Type.PENDING)
+ .getName();
+ this.currentOffset = 0;
+ } else {
+ this.streamName = getDefaultStreamName();
+ }
+ return null;
+ },
+ tryCreateTable);
+ }
return this.streamName;
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 9b28cb664a0..aa2b16bd47d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -3025,4 +3025,50 @@ public class BigQueryIOWriteTest implements Serializable {
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
containsInAnyOrder(new TableRow().set("name", "a"), new TableRow().set("name", "b")));
}
+
+ @Test
+ public void testBatchStorageWriteWithMultipleAppendsPerStream() throws Exception {
+ assumeTrue(useStorageApi);
+ assumeTrue(!useStreaming);
+
+ // reduce threshold to trigger multiple stream appends
+ p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdRecordCount(0);
+ // limit parallelism to limit the number of write streams we have open
+ p.getOptions().as(DirectOptions.class).setTargetParallelism(1);
+
+ TableSchema schema =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("num").setType("INTEGER"),
+ new TableFieldSchema().setName("name").setType("STRING")));
+ Table fakeTable = new Table();
+ TableReference ref =
+ new TableReference()
+ .setProjectId("project-id")
+ .setDatasetId("dataset-id")
+ .setTableId("table-id");
+ fakeTable.setSchema(schema);
+ fakeTable.setTableReference(ref);
+ fakeDatasetService.createTable(fakeTable);
+
+ List<TableRow> rows = new ArrayList<TableRow>(100);
+ for (int i = 0; i < 100; i++) {
+ rows.add(new TableRow().set("num", String.valueOf(i)).set("name", String.valueOf(i)));
+ }
+ p.apply(Create.of(rows))
+ .apply(
+ "Save Events To BigQuery",
+ BigQueryIO.writeTableRows()
+ .to(ref)
+ .withMethod(Write.Method.STORAGE_WRITE_API)
+ .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+ .withTestServices(fakeBqServices));
+
+ p.run();
+
+ assertThat(
+ fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+ containsInAnyOrder(Iterables.toArray(rows, TableRow.class)));
+ }
}