You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/09/13 18:23:03 UTC
[beam] branch master updated: Add streaming test for Write API sink (#21903)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6f25fba3fea Add streaming test for Write API sink (#21903)
6f25fba3fea is described below
commit 6f25fba3fea1f407c7a008c84709047b7a55be54
Author: AlexZMLyu <10...@users.noreply.github.com>
AuthorDate: Tue Sep 13 11:22:56 2022 -0700
Add streaming test for Write API sink (#21903)
* Add streaming test for Write API sink
* Update BigQueryIOStorageWriteIT.java
* Update BigQueryIOStorageWriteIT.java
* Update BigQueryIOStorageWriteIT.java
* Update streaming source type
Change streaming source type from TestStream to GenerateSequence
* Update BigQueryIOStorageWriteIT.java
Try improved formatting
* Update BigQueryIOStorageWriteIT.java
Formatting
---
.../io/gcp/bigquery/BigQueryIOStorageWriteIT.java | 55 +++++++++++++++-------
1 file changed, 39 insertions(+), 16 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
index f00d93801d2..db4a1008ecd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -33,16 +34,16 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * Integration tests for {@link
- * org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO#write(SerializableFunction)}. This test writes
- * 30MB data to BQ and verify the written row count.
+ * Integration tests for {@link BigQueryIO#write()}. The batch mode tests write 30MB data to BQ and
+ * verify the written row count; the streaming mode tests write 3k rows of data to BQ and verify the
+ * written row count.
*/
@RunWith(JUnit4.class)
public class BigQueryIOStorageWriteIT {
@@ -50,7 +51,7 @@ public class BigQueryIOStorageWriteIT {
private enum WriteMode {
EXACT_ONCE,
AT_LEAST_ONCE
- };
+ }
private String project;
private static final String DATASET_ID = "big_query_storage";
@@ -73,14 +74,26 @@ public class BigQueryIOStorageWriteIT {
}
static class FillRowFn extends DoFn<Long, TableRow> {
+
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TableRow().set("number", c.element()).set("str", "aaaaaaaaaa"));
}
}
- private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode) {
- String tableName = TABLE_PREFIX + System.currentTimeMillis();
+ private GenerateSequence stream(int rowCount) {
+ int timestampIntervalInMilliseconds = 10;
+ return GenerateSequence.from(0)
+ .to(rowCount)
+ .withRate(1, Duration.millis(timestampIntervalInMilliseconds));
+ }
+
+ private void runBigQueryIOStorageWritePipeline(
+ int rowCount, WriteMode writeMode, Boolean isStreaming) {
+ String tableName =
+ isStreaming
+ ? TABLE_PREFIX + "streaming_" + System.currentTimeMillis()
+ : TABLE_PREFIX + System.currentTimeMillis();
TableSchema schema =
new TableSchema()
.setFields(
@@ -89,7 +102,7 @@ public class BigQueryIOStorageWriteIT {
new TableFieldSchema().setName("str").setType("STRING")));
Pipeline p = Pipeline.create(bqOptions);
- p.apply("Input", GenerateSequence.from(0).to(rowCount))
+ p.apply("Input", isStreaming ? stream(rowCount) : GenerateSequence.from(0).to(rowCount))
.apply("GenerateMessage", ParDo.of(new FillRowFn()))
.apply(
"WriteToBQ",
@@ -109,22 +122,32 @@ public class BigQueryIOStorageWriteIT {
assertTrue(
Integer.parseInt((String) response.getRows().get(0).getF().get(0).getV()) >= rowCount);
}
- } catch (IOException e) {
- assertTrue("Unexpected exception: " + e.toString(), false);
- } catch (InterruptedException e) {
- assertTrue("Unexpected exception: " + e.toString(), false);
+ } catch (IOException | InterruptedException e) {
+ fail("Unexpected exception: " + e);
}
}
@Test
- public void testBigQueryStorageWrite30MProto() throws Exception {
+ public void testBigQueryStorageWrite30MProto() {
+ setUpTestEnvironment(WriteMode.EXACT_ONCE);
+ runBigQueryIOStorageWritePipeline(3000000, WriteMode.EXACT_ONCE, false);
+ }
+
+ @Test
+ public void testBigQueryStorageWrite30MProtoALO() {
+ setUpTestEnvironment(WriteMode.AT_LEAST_ONCE);
+ runBigQueryIOStorageWritePipeline(3000000, WriteMode.AT_LEAST_ONCE, false);
+ }
+
+ @Test
+ public void testBigQueryStorageWrite3KProtoStreaming() {
setUpTestEnvironment(WriteMode.EXACT_ONCE);
- runBigQueryIOStorageWritePipeline(3000000, WriteMode.EXACT_ONCE);
+ runBigQueryIOStorageWritePipeline(3000, WriteMode.EXACT_ONCE, true);
}
@Test
- public void testBigQueryStorageWrite30MProtoALO() throws Exception {
+ public void testBigQueryStorageWrite3KProtoALOStreaming() {
setUpTestEnvironment(WriteMode.AT_LEAST_ONCE);
- runBigQueryIOStorageWritePipeline(3000000, WriteMode.AT_LEAST_ONCE);
+ runBigQueryIOStorageWritePipeline(3000, WriteMode.AT_LEAST_ONCE, true);
}
}