You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/10/28 14:45:34 UTC
[beam] branch master updated: Fix BigQueryIO Performance Test Streaming (#23857)
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c31800413b5 Fix BigQueryIO Performance Test Streaming (#23857)
c31800413b5 is described below
commit c31800413b5c380625f471f7ba8f93e5b14e041f
Author: Yi Hu <ya...@google.com>
AuthorDate: Fri Oct 28 10:45:22 2022 -0400
Fix BigQueryIO Performance Test Streaming (#23857)
* Unify test name to make the test listed as Performance Test
* Run STREAM_INSERT write with streaming pipeline
* Fix Json write bytes
* Better logging to insert failure
---
.test-infra/jenkins/README.md | 6 +--
.../job_PerformanceTests_BigQueryIO_Java.groovy | 13 ++---
.../beam/sdk/bigqueryioperftests/BigQueryIOIT.java | 57 ++++++++++++++++++++--
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 8 ++-
4 files changed, 69 insertions(+), 15 deletions(-)
diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index c860435ea74..e547c23f24f 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -140,9 +140,9 @@ Beam Jenkins overview page: [link](https://ci-beam.apache.org/)
| beam_PerformanceTests_AvroIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_AvroIOIT/), [hdfs_cron](https://ci-beam.apache.org/job/beam_PerformanceTests_AvroIOIT_HDFS/) | `Run Java AvroIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_AvroIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_AvroIOIT) [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_AvroIOIT_HDFS/badge/icon)](https://ci-be [...]
| beam_PerformanceTests_BiqQueryIO_Read_Python | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Read_Python/), [phrase](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_BiqQueryIO_Read_Python_PR/) | `Run BigQueryIO Read Performance Test Python` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Read_Python/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Read_Python) |
| beam_PerformanceTests_BiqQueryIO_Write_Python_Batch | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Write_Python_Batch/), [phrase](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_BiqQueryIO_Write_Python_Batch_PR/) | `Run BigQueryIO Write Performance Test Python Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Write_Python_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_B [...]
-| beam_BiqQueryIO_Batch_Performance_Test_Java_Avro | [cron](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Avro/) | `Run BigQueryIO Batch Performance Test Java Avro` | [![Build Status](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Avro/badge/icon)](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Avro/) |
-| beam_BiqQueryIO_Batch_Performance_Test_Java_Json | [cron](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Json/) | `Run BigQueryIO Batch Performance Test Java Json` | [![Build Status](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Json/badge/icon)](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Json/) |
-| beam_BiqQueryIO_Streaming_Performance_Test_Java | [cron](https://ci-beam.apache.org/job/beam_BiqQueryIO_Streaming_Performance_Test_Java/) | `Run BigQueryIO Streaming Performance Test Java` | [![Build Status](https://ci-beam.apache.org/job/beam_BiqQueryIO_Streaming_Performance_Test_Java/badge/icon)](https://ci-beam.apache.org/job/beam_BiqQueryIO_Streaming_Performance_Test_Java/) |
+| beam_PerformanceTests_BiqQueryIO_Batch_Java_Avro | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Avro/) | `Run BigQueryIO Batch Performance Test Java Avro` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Avro/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Avro/) |
+| beam_PerformanceTests_BiqQueryIO_Batch_Java_Json | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Json/) | `Run BigQueryIO Batch Performance Test Java Json` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Json/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Json/) |
+| beam_PerformanceTests_BiqQueryIO_Streaming_Java | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Streaming_Java/) | `Run BigQueryIO Streaming Performance Test Java` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Streaming_Java/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Streaming_Java/) |
| beam_PerformanceTests_Cdap | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_Cdap/) | `Run Java CdapIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_Cdap/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_Cdap) |
| beam_PerformanceTests_Compressed_TextIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT/), [hdfs_cron](https://ci-beam.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT_HDFS/) | `Run Java CompressedTextIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT) [![Build Status](https://ci-beam.apache.org/j [...]
| beam_PerformanceTests_HadoopFormat | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_HadoopFormat/) | `Run Java HadoopFormatIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_HadoopFormat/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_HadoopFormat) |
diff --git a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy
index 1d8ce84ea12..c3d0ae1f78c 100644
--- a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy
@@ -24,9 +24,9 @@ def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
def jobConfigs = [
[
- title : 'BigQueryIO Streaming Performance Test Java 10 GB',
+ title : 'BigQueryIO Performance Test Streaming Java 10 GB',
triggerPhrase: 'Run BigQueryIO Streaming Performance Test Java',
- name : 'beam_BiqQueryIO_Streaming_Performance_Test_Java',
+ name : 'beam_PerformanceTests_BiqQueryIO_Streaming_Java',
itClass : 'org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT',
properties: [
project : 'apache-beam-testing',
@@ -34,6 +34,7 @@ def jobConfigs = [
tempRoot : 'gs://temp-storage-for-perf-tests/loadtests',
writeMethod : 'STREAMING_INSERTS',
writeFormat : 'JSON',
+ pipelineTimeout : '1200',
testBigQueryDataset : 'beam_performance',
testBigQueryTable : 'bqio_write_10GB_java_stream_' + now,
metricsBigQueryDataset: 'beam_performance',
@@ -53,9 +54,9 @@ def jobConfigs = [
]
],
[
- title : 'BigQueryIO Batch Performance Test Java 10 GB JSON',
+ title : 'BigQueryIO Performance Test Batch Java 10 GB JSON',
triggerPhrase: 'Run BigQueryIO Batch Performance Test Java Json',
- name : 'beam_BiqQueryIO_Batch_Performance_Test_Java_Json',
+ name : 'beam_PerformanceTests_BiqQueryIO_Batch_Java_Json',
itClass : 'org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT',
properties: [
project : 'apache-beam-testing',
@@ -82,9 +83,9 @@ def jobConfigs = [
]
],
[
- title : 'BigQueryIO Batch Performance Test Java 10 GB AVRO',
+ title : 'BigQueryIO Performance Test Batch Java 10 GB AVRO',
triggerPhrase: 'Run BigQueryIO Batch Performance Test Java Avro',
- name : 'beam_BiqQueryIO_Batch_Performance_Test_Java_Avro',
+ name : 'beam_PerformanceTests_BiqQueryIO_Batch_Java_Avro',
itClass : 'org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT',
properties: [
project : 'apache-beam-testing',
diff --git a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
index c67dc936705..0dfc7addc6f 100644
--- a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
+++ b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.bigqueryioperftests;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -28,6 +29,7 @@ import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -43,7 +45,10 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testutils.NamedTestResult;
@@ -54,6 +59,7 @@ import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
+import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -87,6 +93,7 @@ public class BigQueryIOIT {
private static final String READ_TIME_METRIC_NAME = "read_time";
private static final String WRITE_TIME_METRIC_NAME = "write_time";
private static final String AVRO_WRITE_TIME_METRIC_NAME = "avro_write_time";
+ private static final String READ_ELEMENT_METRIC_NAME = "read_count";
private static String testBigQueryDataset;
private static String testBigQueryTable;
private static SyntheticSourceOptions sourceOptions;
@@ -141,10 +148,11 @@ public class BigQueryIOIT {
private void testJsonWrite() {
BigQueryIO.Write<byte[]> writeIO =
BigQueryIO.<byte[]>write()
+ .withSuccessfulInsertsPropagation(false)
.withFormatFunction(
input -> {
TableRow tableRow = new TableRow();
- tableRow.set("data", input);
+ tableRow.set("data", Base64.getEncoder().encodeToString(input));
return tableRow;
});
testWrite(writeIO, WRITE_TIME_METRIC_NAME);
@@ -165,9 +173,13 @@ public class BigQueryIOIT {
}
private void testWrite(BigQueryIO.Write<byte[]> writeIO, String metricName) {
- Pipeline pipeline = Pipeline.create(options);
-
BigQueryIO.Write.Method method = BigQueryIO.Write.Method.valueOf(options.getWriteMethod());
+ if (method == BigQueryIO.Write.Method.STREAMING_INSERTS) {
+ // set streaming for STREAMING_INSERTS write
+ options.as(StreamingOptions.class).setStreaming(true);
+ }
+
+ Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read from source", Read.from(new SyntheticBoundedSource(sourceOptions)))
.apply("Gather time", ParDo.of(new TimeMonitor<>(NAMESPACE, metricName)))
@@ -185,19 +197,30 @@ public class BigQueryIOIT {
new TableFieldSchema().setName("data").setType("BYTES")))));
PipelineResult pipelineResult = pipeline.run();
- PipelineResult.State pipelineState = pipelineResult.waitUntilFinish();
+ PipelineResult.State pipelineState =
+ options.getPipelineTimeout() == null
+ ? pipelineResult.waitUntilFinish()
+ : pipelineResult.waitUntilFinish(
+ Duration.standardSeconds(options.getPipelineTimeout()));
extractAndPublishTime(pipelineResult, metricName);
// Fail the test if pipeline failed.
assertNotEquals(pipelineState, PipelineResult.State.FAILED);
+
+ // set back streaming
+ options.as(StreamingOptions.class).setStreaming(false);
}
private void testRead() {
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read from BQ", BigQueryIO.readTableRows().from(tableQualifier))
- .apply("Gather time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)));
+ .apply("Gather time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
+ .apply("Counting element", ParDo.of(new CountingFn<>(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
PipelineResult result = pipeline.run();
PipelineResult.State pipelineState = result.waitUntilFinish();
+
+ assertEquals(
+ sourceOptions.numRecords, readElementMetric(result, NAMESPACE, READ_ELEMENT_METRIC_NAME));
extractAndPublishTime(result, READ_TIME_METRIC_NAME);
// Fail the test if pipeline failed.
assertNotEquals(pipelineState, PipelineResult.State.FAILED);
@@ -219,6 +242,11 @@ public class BigQueryIOIT {
};
}
+ private long readElementMetric(PipelineResult result, String namespace, String name) {
+ MetricsReader metricsReader = new MetricsReader(result, namespace);
+ return metricsReader.getCounterMetric(name);
+ }
+
/** Options for this io performance test. */
public interface BigQueryPerfTestOptions extends IOTestPipelineOptions {
@Description("Synthetic source options")
@@ -256,6 +284,11 @@ public class BigQueryIOIT {
@Description("Write Avro or JSON to BQ")
void setWriteFormat(String value);
+
+ Integer getPipelineTimeout();
+
+ @Description("Time to wait for the events to be processed by the pipeline (in seconds)")
+ void setPipelineTimeout(Integer writeTimeout);
}
private static class MapKVToV extends DoFn<KV<byte[], byte[]>, byte[]> {
@@ -265,6 +298,20 @@ public class BigQueryIOIT {
}
}
+ private static class CountingFn<T> extends DoFn<T, Void> {
+
+ private final Counter elementCounter;
+
+ CountingFn(String namespace, String name) {
+ elementCounter = Metrics.counter(namespace, name);
+ }
+
+ @ProcessElement
+ public void processElement() {
+ elementCounter.inc(1L);
+ }
+ }
+
private enum WriteFormat {
AVRO,
JSON
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 7702538de1e..5ec5549ed4c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1206,8 +1206,14 @@ class BigQueryServicesImpl implements BigQueryServices {
}
rowsToPublish = retryRows;
idsToPublish = retryIds;
+ // print first 5 failures
+ int numErrorToLog = Math.min(allErrors.size(), 5);
+ LOG.info(
+ "Retrying {} failed inserts to BigQuery. First {} fails: {}",
+ rowsToPublish.size(),
+ numErrorToLog,
+ allErrors.subList(0, numErrorToLog));
allErrors.clear();
- LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());
}
if (successfulRows != null) {
for (int i = 0; i < rowsToPublish.size(); i++) {