You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2023/06/02 15:32:26 UTC

[beam] branch master updated: add back deleted line + test (#26976)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 533ceba1baf add back deleted line + test (#26976)
533ceba1baf is described below

commit 533ceba1bafaf281a37733ed7b129c55476f6a4b
Author: Reuven Lax <re...@google.com>
AuthorDate: Fri Jun 2 08:32:19 2023 -0700

    add back deleted line + test (#26976)
---
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 30 +++++++-------
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 46 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 14 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 9cf5b026039..1531c679f18 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -322,20 +322,22 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       }
 
       String getOrCreateStreamName() throws Exception {
-        CreateTableHelpers.createTableWrapper(
-            () -> {
-              if (!useDefaultStream) {
-                this.streamName =
-                    Preconditions.checkStateNotNull(maybeDatasetService)
-                        .createWriteStream(tableUrn, Type.PENDING)
-                        .getName();
-                this.currentOffset = 0;
-              } else {
-                this.streamName = getDefaultStreamName();
-              }
-              return null;
-            },
-            tryCreateTable);
+        if (Strings.isNullOrEmpty(this.streamName)) {
+          CreateTableHelpers.createTableWrapper(
+              () -> {
+                if (!useDefaultStream) {
+                  this.streamName =
+                      Preconditions.checkStateNotNull(maybeDatasetService)
+                          .createWriteStream(tableUrn, Type.PENDING)
+                          .getName();
+                  this.currentOffset = 0;
+                } else {
+                  this.streamName = getDefaultStreamName();
+                }
+                return null;
+              },
+              tryCreateTable);
+        }
         return this.streamName;
       }
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 9b28cb664a0..aa2b16bd47d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -3025,4 +3025,50 @@ public class BigQueryIOWriteTest implements Serializable {
         fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
         containsInAnyOrder(new TableRow().set("name", "a"), new TableRow().set("name", "b")));
   }
+
+  @Test
+  public void testBatchStorageWriteWithMultipleAppendsPerStream() throws Exception {
+    assumeTrue(useStorageApi);
+    assumeTrue(!useStreaming);
+
+    // reduce threshold to trigger multiple stream appends
+    p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdRecordCount(0);
+    // limit parallelism to limit the number of write streams we have open
+    p.getOptions().as(DirectOptions.class).setTargetParallelism(1);
+
+    TableSchema schema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("num").setType("INTEGER"),
+                    new TableFieldSchema().setName("name").setType("STRING")));
+    Table fakeTable = new Table();
+    TableReference ref =
+        new TableReference()
+            .setProjectId("project-id")
+            .setDatasetId("dataset-id")
+            .setTableId("table-id");
+    fakeTable.setSchema(schema);
+    fakeTable.setTableReference(ref);
+    fakeDatasetService.createTable(fakeTable);
+
+    List<TableRow> rows = new ArrayList<TableRow>(100);
+    for (int i = 0; i < 100; i++) {
+      rows.add(new TableRow().set("num", String.valueOf(i)).set("name", String.valueOf(i)));
+    }
+    p.apply(Create.of(rows))
+        .apply(
+            "Save Events To BigQuery",
+            BigQueryIO.writeTableRows()
+                .to(ref)
+                .withMethod(Write.Method.STORAGE_WRITE_API)
+                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+                .withTestServices(fakeBqServices));
+
+    p.run();
+
+    assertThat(
+        fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+        containsInAnyOrder(Iterables.toArray(rows, TableRow.class)));
+  }
 }