You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/10/28 14:45:34 UTC

[beam] branch master updated: Fix BigQueryIO Performance Test Streaming (#23857)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c31800413b5 Fix BigQueryIO Performance Test Streaming (#23857)
c31800413b5 is described below

commit c31800413b5c380625f471f7ba8f93e5b14e041f
Author: Yi Hu <ya...@google.com>
AuthorDate: Fri Oct 28 10:45:22 2022 -0400

    Fix BigQueryIO Performance Test Streaming (#23857)
    
    * Unify test name to make the test listed as Performance Test
    
    * Run STREAM_INSERT write with streaming pipeline
    
    * Fix Json write bytes
    
    * Better logging to insert failure
---
 .test-infra/jenkins/README.md                      |  6 +--
 .../job_PerformanceTests_BigQueryIO_Java.groovy    | 13 ++---
 .../beam/sdk/bigqueryioperftests/BigQueryIOIT.java | 57 ++++++++++++++++++++--
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  |  8 ++-
 4 files changed, 69 insertions(+), 15 deletions(-)

diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index c860435ea74..e547c23f24f 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -140,9 +140,9 @@ Beam Jenkins overview page: [link](https://ci-beam.apache.org/)
 | beam_PerformanceTests_AvroIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_AvroIOIT/), [hdfs_cron](https://ci-beam.apache.org/job/beam_PerformanceTests_AvroIOIT_HDFS/) | `Run Java AvroIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_AvroIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_AvroIOIT) [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_AvroIOIT_HDFS/badge/icon)](https://ci-be [...]
 | beam_PerformanceTests_BiqQueryIO_Read_Python | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Read_Python/), [phrase](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_BiqQueryIO_Read_Python_PR/) | `Run BigQueryIO Read Performance Test Python` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Read_Python/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Read_Python) |
 | beam_PerformanceTests_BiqQueryIO_Write_Python_Batch | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Write_Python_Batch/), [phrase](https://ci-beam.apache.org/view/PerformanceTests/job/beam_PerformanceTests_BiqQueryIO_Write_Python_Batch_PR/) | `Run BigQueryIO Write Performance Test Python Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Write_Python_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_B [...]
-| beam_BiqQueryIO_Batch_Performance_Test_Java_Avro | [cron](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Avro/) | `Run BigQueryIO Batch Performance Test Java Avro` | [![Build Status](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Avro/badge/icon)](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Avro/) |
-| beam_BiqQueryIO_Batch_Performance_Test_Java_Json | [cron](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Json/) | `Run BigQueryIO Batch Performance Test Java Json` | [![Build Status](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Json/badge/icon)](https://ci-beam.apache.org/job/beam_BiqQueryIO_Batch_Performance_Test_Java_Json/) |
-| beam_BiqQueryIO_Streaming_Performance_Test_Java | [cron](https://ci-beam.apache.org/job/beam_BiqQueryIO_Streaming_Performance_Test_Java/) | `Run BigQueryIO Streaming Performance Test Java` | [![Build Status](https://ci-beam.apache.org/job/beam_BiqQueryIO_Streaming_Performance_Test_Java/badge/icon)](https://ci-beam.apache.org/job/beam_BiqQueryIO_Streaming_Performance_Test_Java/) |
+| beam_PerformanceTests_BiqQueryIO_Batch_Java_Avro | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Avro/) | `Run BigQueryIO Batch Performance Test Java Avro` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Avro/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Avro/) |
+| beam_PerformanceTests_BiqQueryIO_Batch_Java_Json | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Json/) | `Run BigQueryIO Batch Performance Test Java Json` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Json/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Batch_Java_Json/) |
+| beam_PerformanceTests_BiqQueryIO_Streaming_Java | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Streaming_Java/) | `Run BigQueryIO Streaming Performance Test Java` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Streaming_Java/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_BiqQueryIO_Streaming_Java/) |
 | beam_PerformanceTests_Cdap | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_Cdap/) | `Run Java CdapIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_Cdap/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_Cdap) |
 | beam_PerformanceTests_Compressed_TextIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT/), [hdfs_cron](https://ci-beam.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT_HDFS/) | `Run Java CompressedTextIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT) [![Build Status](https://ci-beam.apache.org/j [...]
 | beam_PerformanceTests_HadoopFormat | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_HadoopFormat/) | `Run Java HadoopFormatIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_HadoopFormat/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_HadoopFormat) |
diff --git a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy
index 1d8ce84ea12..c3d0ae1f78c 100644
--- a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy
@@ -24,9 +24,9 @@ def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
 
 def jobConfigs = [
   [
-    title        : 'BigQueryIO Streaming Performance Test Java 10 GB',
+    title        : 'BigQueryIO Performance Test Streaming Java 10 GB',
     triggerPhrase: 'Run BigQueryIO Streaming Performance Test Java',
-    name         : 'beam_BiqQueryIO_Streaming_Performance_Test_Java',
+    name         : 'beam_PerformanceTests_BiqQueryIO_Streaming_Java',
     itClass      : 'org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT',
     properties: [
       project               : 'apache-beam-testing',
@@ -34,6 +34,7 @@ def jobConfigs = [
       tempRoot              : 'gs://temp-storage-for-perf-tests/loadtests',
       writeMethod           : 'STREAMING_INSERTS',
       writeFormat           : 'JSON',
+      pipelineTimeout       : '1200',
       testBigQueryDataset   : 'beam_performance',
       testBigQueryTable     : 'bqio_write_10GB_java_stream_' + now,
       metricsBigQueryDataset: 'beam_performance',
@@ -53,9 +54,9 @@ def jobConfigs = [
     ]
   ],
   [
-    title        : 'BigQueryIO Batch Performance Test Java 10 GB JSON',
+    title        : 'BigQueryIO Performance Test Batch Java 10 GB JSON',
     triggerPhrase: 'Run BigQueryIO Batch Performance Test Java Json',
-    name         : 'beam_BiqQueryIO_Batch_Performance_Test_Java_Json',
+    name         : 'beam_PerformanceTests_BiqQueryIO_Batch_Java_Json',
     itClass      : 'org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT',
     properties: [
       project               : 'apache-beam-testing',
@@ -82,9 +83,9 @@ def jobConfigs = [
     ]
   ],
   [
-    title        : 'BigQueryIO Batch Performance Test Java 10 GB AVRO',
+    title        : 'BigQueryIO Performance Test Batch Java 10 GB AVRO',
     triggerPhrase: 'Run BigQueryIO Batch Performance Test Java Avro',
-    name         : 'beam_BiqQueryIO_Batch_Performance_Test_Java_Avro',
+    name         : 'beam_PerformanceTests_BiqQueryIO_Batch_Java_Avro',
     itClass      : 'org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT',
     properties: [
       project               : 'apache-beam-testing',
diff --git a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
index c67dc936705..0dfc7addc6f 100644
--- a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
+++ b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.bigqueryioperftests;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -28,6 +29,7 @@ import com.google.cloud.bigquery.BigQueryOptions;
 import com.google.cloud.bigquery.TableId;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -43,7 +45,10 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
 import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
 import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.testutils.NamedTestResult;
@@ -54,6 +59,7 @@ import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
+import org.joda.time.Duration;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -87,6 +93,7 @@ public class BigQueryIOIT {
   private static final String READ_TIME_METRIC_NAME = "read_time";
   private static final String WRITE_TIME_METRIC_NAME = "write_time";
   private static final String AVRO_WRITE_TIME_METRIC_NAME = "avro_write_time";
+  private static final String READ_ELEMENT_METRIC_NAME = "read_count";
   private static String testBigQueryDataset;
   private static String testBigQueryTable;
   private static SyntheticSourceOptions sourceOptions;
@@ -141,10 +148,11 @@ public class BigQueryIOIT {
   private void testJsonWrite() {
     BigQueryIO.Write<byte[]> writeIO =
         BigQueryIO.<byte[]>write()
+            .withSuccessfulInsertsPropagation(false)
             .withFormatFunction(
                 input -> {
                   TableRow tableRow = new TableRow();
-                  tableRow.set("data", input);
+                  tableRow.set("data", Base64.getEncoder().encodeToString(input));
                   return tableRow;
                 });
     testWrite(writeIO, WRITE_TIME_METRIC_NAME);
@@ -165,9 +173,13 @@ public class BigQueryIOIT {
   }
 
   private void testWrite(BigQueryIO.Write<byte[]> writeIO, String metricName) {
-    Pipeline pipeline = Pipeline.create(options);
-
     BigQueryIO.Write.Method method = BigQueryIO.Write.Method.valueOf(options.getWriteMethod());
+    if (method == BigQueryIO.Write.Method.STREAMING_INSERTS) {
+      // set streaming for STREAMING_INSERTS write
+      options.as(StreamingOptions.class).setStreaming(true);
+    }
+
+    Pipeline pipeline = Pipeline.create(options);
     pipeline
         .apply("Read from source", Read.from(new SyntheticBoundedSource(sourceOptions)))
         .apply("Gather time", ParDo.of(new TimeMonitor<>(NAMESPACE, metricName)))
@@ -185,19 +197,30 @@ public class BigQueryIOIT {
                                 new TableFieldSchema().setName("data").setType("BYTES")))));
 
     PipelineResult pipelineResult = pipeline.run();
-    PipelineResult.State pipelineState = pipelineResult.waitUntilFinish();
+    PipelineResult.State pipelineState =
+        options.getPipelineTimeout() == null
+            ? pipelineResult.waitUntilFinish()
+            : pipelineResult.waitUntilFinish(
+                Duration.standardSeconds(options.getPipelineTimeout()));
     extractAndPublishTime(pipelineResult, metricName);
     // Fail the test if pipeline failed.
     assertNotEquals(pipelineState, PipelineResult.State.FAILED);
+
+    // set back streaming
+    options.as(StreamingOptions.class).setStreaming(false);
   }
 
   private void testRead() {
     Pipeline pipeline = Pipeline.create(options);
     pipeline
         .apply("Read from BQ", BigQueryIO.readTableRows().from(tableQualifier))
-        .apply("Gather time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)));
+        .apply("Gather time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
+        .apply("Counting element", ParDo.of(new CountingFn<>(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
     PipelineResult result = pipeline.run();
     PipelineResult.State pipelineState = result.waitUntilFinish();
+
+    assertEquals(
+        sourceOptions.numRecords, readElementMetric(result, NAMESPACE, READ_ELEMENT_METRIC_NAME));
     extractAndPublishTime(result, READ_TIME_METRIC_NAME);
     // Fail the test if pipeline failed.
     assertNotEquals(pipelineState, PipelineResult.State.FAILED);
@@ -219,6 +242,11 @@ public class BigQueryIOIT {
     };
   }
 
+  private long readElementMetric(PipelineResult result, String namespace, String name) {
+    MetricsReader metricsReader = new MetricsReader(result, namespace);
+    return metricsReader.getCounterMetric(name);
+  }
+
   /** Options for this io performance test. */
   public interface BigQueryPerfTestOptions extends IOTestPipelineOptions {
     @Description("Synthetic source options")
@@ -256,6 +284,11 @@ public class BigQueryIOIT {
 
     @Description("Write Avro or JSON to BQ")
     void setWriteFormat(String value);
+
+    Integer getPipelineTimeout();
+
+    @Description("Time to wait for the events to be processed by the pipeline (in seconds)")
+    void setPipelineTimeout(Integer writeTimeout);
   }
 
   private static class MapKVToV extends DoFn<KV<byte[], byte[]>, byte[]> {
@@ -265,6 +298,20 @@ public class BigQueryIOIT {
     }
   }
 
+  private static class CountingFn<T> extends DoFn<T, Void> {
+
+    private final Counter elementCounter;
+
+    CountingFn(String namespace, String name) {
+      elementCounter = Metrics.counter(namespace, name);
+    }
+
+    @ProcessElement
+    public void processElement() {
+      elementCounter.inc(1L);
+    }
+  }
+
   private enum WriteFormat {
     AVRO,
     JSON
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 7702538de1e..5ec5549ed4c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1206,8 +1206,14 @@ class BigQueryServicesImpl implements BigQueryServices {
         }
         rowsToPublish = retryRows;
         idsToPublish = retryIds;
+        // print first 5 failures
+        int numErrorToLog = Math.min(allErrors.size(), 5);
+        LOG.info(
+            "Retrying {} failed inserts to BigQuery. First {} fails: {}",
+            rowsToPublish.size(),
+            numErrorToLog,
+            allErrors.subList(0, numErrorToLog));
         allErrors.clear();
-        LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());
       }
       if (successfulRows != null) {
         for (int i = 0; i < rowsToPublish.size(); i++) {