You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/09/13 18:23:03 UTC

[beam] branch master updated: Add streaming test for Write API sink (#21903)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f25fba3fea Add streaming test for Write API sink (#21903)
6f25fba3fea is described below

commit 6f25fba3fea1f407c7a008c84709047b7a55be54
Author: AlexZMLyu <10...@users.noreply.github.com>
AuthorDate: Tue Sep 13 11:22:56 2022 -0700

    Add streaming test for Write API sink (#21903)
    
    * Add streaming test for Write API sink
    
    * Update BigQueryIOStorageWriteIT.java
    
    * Update BigQueryIOStorageWriteIT.java
    
    * Update BigQueryIOStorageWriteIT.java
    
    * Update streaming source type
    
    Change streaming source type from TestStream to GenerateSequence
    
    * Update BigQueryIOStorageWriteIT.java
    
    Try improved formatting
    
    * Update BigQueryIOStorageWriteIT.java
    
    Formatting
---
 .../io/gcp/bigquery/BigQueryIOStorageWriteIT.java  | 55 +++++++++++++++-------
 1 file changed, 39 insertions(+), 16 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
index f00d93801d2..db4a1008ecd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import com.google.api.services.bigquery.model.QueryResponse;
 import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -33,16 +34,16 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Integration tests for {@link
- * org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO#write(SerializableFunction)}. This test writes
- * 30MB data to BQ and verify the written row count.
+ * Integration tests for {@link BigQueryIO#write()}. The batch mode tests write 30MB data to BQ and
+ * verify the written row count; the streaming mode tests write 3k rows of data to BQ and verify the
+ * written row count.
  */
 @RunWith(JUnit4.class)
 public class BigQueryIOStorageWriteIT {
@@ -50,7 +51,7 @@ public class BigQueryIOStorageWriteIT {
   private enum WriteMode {
     EXACT_ONCE,
     AT_LEAST_ONCE
-  };
+  }
 
   private String project;
   private static final String DATASET_ID = "big_query_storage";
@@ -73,14 +74,26 @@ public class BigQueryIOStorageWriteIT {
   }
 
   static class FillRowFn extends DoFn<Long, TableRow> {
+
     @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(new TableRow().set("number", c.element()).set("str", "aaaaaaaaaa"));
     }
   }
 
-  private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode) {
-    String tableName = TABLE_PREFIX + System.currentTimeMillis();
+  private GenerateSequence stream(int rowCount) {
+    int timestampIntervalInMilliseconds = 10;
+    return GenerateSequence.from(0)
+        .to(rowCount)
+        .withRate(1, Duration.millis(timestampIntervalInMilliseconds));
+  }
+
+  private void runBigQueryIOStorageWritePipeline(
+      int rowCount, WriteMode writeMode, Boolean isStreaming) {
+    String tableName =
+        isStreaming
+            ? TABLE_PREFIX + "streaming_" + System.currentTimeMillis()
+            : TABLE_PREFIX + System.currentTimeMillis();
     TableSchema schema =
         new TableSchema()
             .setFields(
@@ -89,7 +102,7 @@ public class BigQueryIOStorageWriteIT {
                     new TableFieldSchema().setName("str").setType("STRING")));
 
     Pipeline p = Pipeline.create(bqOptions);
-    p.apply("Input", GenerateSequence.from(0).to(rowCount))
+    p.apply("Input", isStreaming ? stream(rowCount) : GenerateSequence.from(0).to(rowCount))
         .apply("GenerateMessage", ParDo.of(new FillRowFn()))
         .apply(
             "WriteToBQ",
@@ -109,22 +122,32 @@ public class BigQueryIOStorageWriteIT {
         assertTrue(
             Integer.parseInt((String) response.getRows().get(0).getF().get(0).getV()) >= rowCount);
       }
-    } catch (IOException e) {
-      assertTrue("Unexpected exception: " + e.toString(), false);
-    } catch (InterruptedException e) {
-      assertTrue("Unexpected exception: " + e.toString(), false);
+    } catch (IOException | InterruptedException e) {
+      fail("Unexpected exception: " + e);
     }
   }
 
   @Test
-  public void testBigQueryStorageWrite30MProto() throws Exception {
+  public void testBigQueryStorageWrite30MProto() {
+    setUpTestEnvironment(WriteMode.EXACT_ONCE);
+    runBigQueryIOStorageWritePipeline(3000000, WriteMode.EXACT_ONCE, false);
+  }
+
+  @Test
+  public void testBigQueryStorageWrite30MProtoALO() {
+    setUpTestEnvironment(WriteMode.AT_LEAST_ONCE);
+    runBigQueryIOStorageWritePipeline(3000000, WriteMode.AT_LEAST_ONCE, false);
+  }
+
+  @Test
+  public void testBigQueryStorageWrite3KProtoStreaming() {
     setUpTestEnvironment(WriteMode.EXACT_ONCE);
-    runBigQueryIOStorageWritePipeline(3000000, WriteMode.EXACT_ONCE);
+    runBigQueryIOStorageWritePipeline(3000, WriteMode.EXACT_ONCE, true);
   }
 
   @Test
-  public void testBigQueryStorageWrite30MProtoALO() throws Exception {
+  public void testBigQueryStorageWrite3KProtoALOStreaming() {
     setUpTestEnvironment(WriteMode.AT_LEAST_ONCE);
-    runBigQueryIOStorageWritePipeline(3000000, WriteMode.AT_LEAST_ONCE);
+    runBigQueryIOStorageWritePipeline(3000, WriteMode.AT_LEAST_ONCE, true);
   }
 }