You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/09/09 20:32:38 UTC

[beam] branch master updated: Assert pipeline results in performance tests (#23027)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 07439a0ab8a Assert pipeline results in performance tests (#23027)
07439a0ab8a is described below

commit 07439a0ab8a4f62d4f549722d3f0db20bc36c255
Author: Yi Hu <ya...@google.com>
AuthorDate: Fri Sep 9 16:32:30 2022 -0400

    Assert pipeline results in performance tests (#23027)
    
    * Assert pipeline results in performance tests
    
    * Fix possible false possitive test status
    
    * Remove duplicate exact same kafka IO streaming test and re-arrange tests
---
 .../jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 29 +++-------------------
 .../provider/bigquery/BigQueryIOPushDownIT.java    | 13 +++++++---
 .../beam/sdk/bigqueryioperftests/BigQueryIOIT.java | 10 ++++++--
 .../java/org/apache/beam/sdk/io/cdap/CdapIOIT.java |  5 +++-
 .../java/org/apache/beam/sdk/io/avro/AvroIOIT.java |  5 +++-
 .../apache/beam/sdk/io/parquet/ParquetIOIT.java    |  5 +++-
 .../java/org/apache/beam/sdk/io/text/TextIOIT.java |  5 +++-
 .../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java  |  8 ++++--
 .../java/org/apache/beam/sdk/io/xml/XmlIOIT.java   |  5 +++-
 .../sdk/io/hadoop/format/HadoopFormatIOIT.java     |  8 ++++--
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java |  8 ++++--
 .../org/apache/beam/sdk/io/kafka/KafkaIOIT.java    | 14 ++++++++---
 .../apache/beam/sdk/io/mongodb/MongoDBIOIT.java    |  9 +++++--
 13 files changed, 76 insertions(+), 48 deletions(-)

diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
index 6d5efaa6de2..40d117ca3e7 100644
--- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
@@ -94,21 +94,6 @@ job(jobName) {
 
   // We are using a smaller number of records for streaming test since streaming read is much slower
   // than batch read.
-  Map dataflowRunnerV2SdfWrapperPipelineOptions = pipelineOptions + [
-    sourceOptions                : """
-                                     {
-                                       "numRecords": "100000",
-                                       "keySizeBytes": "1",
-                                       "valueSizeBytes": "90"
-                                     }
-                                  """.trim().replaceAll("\\s", ""),
-    kafkaTopic                   : 'beam-runnerv2',
-    bigQueryTable                : 'kafkaioit_results_sdf_wrapper',
-    influxMeasurement            : 'kafkaioit_results_sdf_wrapper',
-    // TODO(https://github.com/apache/beam/issues/20806) remove shuffle_mode=appliance with runner v2 once issue is resolved.
-    experiments                  : 'use_runner_v2,shuffle_mode=appliance,use_unified_worker',
-  ]
-
   Map dataflowRunnerV2SdfPipelineOptions = pipelineOptions + [
     sourceOptions                : """
                                      {
@@ -129,15 +114,7 @@ job(jobName) {
       rootBuildScriptDir(common.checkoutDir)
       common.setGradleSwitches(delegate)
       switches("--info")
-      switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(pipelineOptions)}\'")
-      switches("-DintegrationTestRunner=dataflow")
-      tasks(":sdks:java:io:kafka:integrationTest --tests org.apache.beam.sdk.io.kafka.KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInBatch")
-    }
-    gradle {
-      rootBuildScriptDir(common.checkoutDir)
-      common.setGradleSwitches(delegate)
-      switches("--info")
-      switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(dataflowRunnerV2SdfWrapperPipelineOptions)}\'")
+      switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(dataflowRunnerV2SdfPipelineOptions)}\'")
       switches("-DintegrationTestRunner=dataflow")
       tasks(":sdks:java:io:kafka:integrationTest --tests org.apache.beam.sdk.io.kafka.KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInStreaming")
     }
@@ -145,9 +122,9 @@ job(jobName) {
       rootBuildScriptDir(common.checkoutDir)
       common.setGradleSwitches(delegate)
       switches("--info")
-      switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(dataflowRunnerV2SdfPipelineOptions)}\'")
+      switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(pipelineOptions)}\'")
       switches("-DintegrationTestRunner=dataflow")
-      tasks(":sdks:java:io:kafka:integrationTest --tests org.apache.beam.sdk.io.kafka.KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInStreaming")
+      tasks(":sdks:java:io:kafka:integrationTest --tests org.apache.beam.sdk.io.kafka.KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInBatch")
     }
   }
 }
diff --git a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
index a81711ab643..34842fa8c8d 100644
--- a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
+++ b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
 
 import static org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets.getRuleSets;
+import static org.junit.Assert.assertNotEquals;
 
 import com.google.cloud.Timestamp;
 import java.util.ArrayList;
@@ -123,8 +124,10 @@ public class BigQueryIOPushDownIT {
         .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC)));
 
     PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
+    PipelineResult.State pipelineState = result.waitUntilFinish();
     collectAndPublishMetrics(result, "_directread_pushdown");
+    // Fail the test if pipeline failed.
+    assertNotEquals(pipelineState, PipelineResult.State.FAILED);
   }
 
   @Test
@@ -151,8 +154,10 @@ public class BigQueryIOPushDownIT {
         .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC)));
 
     PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
+    PipelineResult.State pipelineState = result.waitUntilFinish();
     collectAndPublishMetrics(result, "_directread");
+    // Fail the test if pipeline failed.
+    assertNotEquals(pipelineState, PipelineResult.State.FAILED);
   }
 
   @Test
@@ -164,8 +169,10 @@ public class BigQueryIOPushDownIT {
         .apply(ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC)));
 
     PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
+    PipelineResult.State pipelineState = result.waitUntilFinish();
     collectAndPublishMetrics(result, "_default");
+    // Fail the test if pipeline failed.
+    assertNotEquals(pipelineState, PipelineResult.State.FAILED);
   }
 
   private void collectAndPublishMetrics(PipelineResult readResult, String postfix) {
diff --git a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
index cc5dffe7bf1..c67dc936705 100644
--- a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
+++ b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.bigqueryioperftests;
 
+import static org.junit.Assert.assertNotEquals;
+
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
@@ -183,8 +185,10 @@ public class BigQueryIOIT {
                                 new TableFieldSchema().setName("data").setType("BYTES")))));
 
     PipelineResult pipelineResult = pipeline.run();
-    pipelineResult.waitUntilFinish();
+    PipelineResult.State pipelineState = pipelineResult.waitUntilFinish();
     extractAndPublishTime(pipelineResult, metricName);
+    // Fail the test if pipeline failed.
+    assertNotEquals(pipelineState, PipelineResult.State.FAILED);
   }
 
   private void testRead() {
@@ -193,8 +197,10 @@ public class BigQueryIOIT {
         .apply("Read from BQ", BigQueryIO.readTableRows().from(tableQualifier))
         .apply("Gather time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)));
     PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
+    PipelineResult.State pipelineState = result.waitUntilFinish();
     extractAndPublishTime(result, READ_TIME_METRIC_NAME);
+    // Fail the test if pipeline failed.
+    assertNotEquals(pipelineState, PipelineResult.State.FAILED);
   }
 
   private void extractAndPublishTime(PipelineResult pipelineResult, String writeTimeMetricName) {
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java
index 1b30ba712f4..bb5f205fc51 100644
--- a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.cdap;
 import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
 import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
 import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
+import static org.junit.Assert.assertNotEquals;
 
 import com.google.cloud.Timestamp;
 import io.cdap.plugin.common.Constants;
@@ -147,11 +148,13 @@ public class CdapIOIT {
     PAssert.thatSingleton(consolidatedHashcode).isEqualTo(getExpectedHashForRowCount(numberOfRows));
 
     PipelineResult readResult = readPipeline.run();
-    readResult.waitUntilFinish();
+    PipelineResult.State readState = readResult.waitUntilFinish();
 
     if (!options.isWithTestcontainers()) {
       collectAndPublishMetrics(writeResult, readResult);
     }
+    // Fail the test if pipeline failed.
+    assertNotEquals(readState, PipelineResult.State.FAILED);
   }
 
   private CdapIO.Write<TestRowDBWritable, NullWritable> writeToDB(Map<String, Object> params) {
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
index f9f6cb84583..832c69af1c4 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.avro;
 import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
+import static org.junit.Assert.assertNotEquals;
 
 import com.google.cloud.Timestamp;
 import java.util.HashSet;
@@ -158,8 +159,10 @@ public class AvroIOIT {
             .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
 
     PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
+    PipelineResult.State pipelineState = result.waitUntilFinish();
     collectAndPublishMetrics(result);
+    // Fail the test if pipeline failed.
+    assertNotEquals(pipelineState, PipelineResult.State.FAILED);
   }
 
   private void collectAndPublishMetrics(PipelineResult result) {
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
index 8fb39b4bbaa..7db703b7b02 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.parquet;
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
 import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.junit.Assert.assertNotEquals;
 
 import com.google.cloud.Timestamp;
 import java.util.HashSet;
@@ -164,8 +165,10 @@ public class ParquetIOIT {
             .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
 
     PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
+    PipelineResult.State pipelineState = result.waitUntilFinish();
     collectAndPublishMetrics(result);
+    // Fail the test if pipeline failed.
+    assertNotEquals(pipelineState, PipelineResult.State.FAILED);
   }
 
   private void collectAndPublishMetrics(PipelineResult result) {
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index ecf2f97d94a..e6965d1f20a 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.text;
 import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
+import static org.junit.Assert.assertNotEquals;
 
 import com.google.cloud.Timestamp;
 import java.util.HashSet;
@@ -150,9 +151,11 @@ public class TextIOIT {
             .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
 
     PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
+    PipelineResult.State pipelineState = result.waitUntilFinish();
 
     collectAndPublishMetrics(result);
+    // Fail the test if pipeline failed.
+    assertNotEquals(pipelineState, PipelineResult.State.FAILED);
   }
 
   private void collectAndPublishMetrics(PipelineResult result) {
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 38e4c8c9c89..cde7b010af7 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.tfrecord;
 import static org.apache.beam.sdk.io.Compression.AUTO;
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
+import static org.junit.Assert.assertNotEquals;
 
 import com.google.cloud.Timestamp;
 import java.nio.charset.StandardCharsets;
@@ -139,7 +140,7 @@ public class TFRecordIOIT {
         .apply("Write content to files", writeTransform);
 
     final PipelineResult writeResult = writePipeline.run();
-    writeResult.waitUntilFinish();
+    PipelineResult.State writeState = writeResult.waitUntilFinish();
 
     String filenamePattern = createFilenamePattern();
     PCollection<String> consolidatedHashcode =
@@ -161,8 +162,11 @@ public class TFRecordIOIT {
             ParDo.of(new DeleteFileFn())
                 .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
     final PipelineResult readResult = readPipeline.run();
-    readResult.waitUntilFinish();
+    PipelineResult.State readState = readResult.waitUntilFinish();
     collectAndPublishMetrics(writeResult, readResult);
+    // Fail the test if pipeline failed.
+    assertNotEquals(writeState, PipelineResult.State.FAILED);
+    assertNotEquals(readState, PipelineResult.State.FAILED);
   }
 
   private void collectAndPublishMetrics(
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
index bd4d2549c15..515f9180089 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.xml;
 
 import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
 import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.junit.Assert.assertNotEquals;
 
 import com.google.cloud.Timestamp;
 import java.io.Serializable;
@@ -171,8 +172,10 @@ public class XmlIOIT {
             .withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
 
     PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
+    PipelineResult.State pipelineState = result.waitUntilFinish();
     collectAndPublishResults(result);
+    // Fail the test if pipeline failed.
+    assertNotEquals(pipelineState, PipelineResult.State.FAILED);
   }
 
   private void collectAndPublishResults(PipelineResult result) {
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
index 8973f586996..3c3c4398b94 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.hadoop.format;
 import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
 import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
 import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
+import static org.junit.Assert.assertNotEquals;
 
 import com.google.cloud.Timestamp;
 import java.sql.SQLException;
@@ -207,7 +208,7 @@ public class HadoopFormatIOIT {
                     new HDFSSynchronization(tmpFolder.getRoot().getAbsolutePath())));
 
     PipelineResult writeResult = writePipeline.run();
-    writeResult.waitUntilFinish();
+    PipelineResult.State writeState = writeResult.waitUntilFinish();
 
     PCollection<String> consolidatedHashcode =
         readPipeline
@@ -223,11 +224,14 @@ public class HadoopFormatIOIT {
     PAssert.thatSingleton(consolidatedHashcode).isEqualTo(getExpectedHashForRowCount(numberOfRows));
 
     PipelineResult readResult = readPipeline.run();
-    readResult.waitUntilFinish();
+    PipelineResult.State readState = readResult.waitUntilFinish();
 
     if (!options.isWithTestcontainers()) {
       collectAndPublishMetrics(writeResult, readResult);
     }
+    // Fail the test if pipeline failed.
+    assertNotEquals(writeState, PipelineResult.State.FAILED);
+    assertNotEquals(readState, PipelineResult.State.FAILED);
   }
 
   private void collectAndPublishMetrics(PipelineResult writeResult, PipelineResult readResult) {
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index e08f7be5c4f..2ecdde7626e 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.io.common.DatabaseTestHelper.assertRowCount;
 import static org.apache.beam.sdk.io.common.DatabaseTestHelper.getTestDataToWrite;
 import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+import static org.junit.Assert.assertNotEquals;
 
 import com.google.cloud.Timestamp;
 import java.sql.SQLException;
@@ -135,10 +136,13 @@ public class JdbcIOIT {
     DatabaseTestHelper.createTable(dataSource, tableName);
     try {
       PipelineResult writeResult = runWrite();
-      writeResult.waitUntilFinish();
+      PipelineResult.State writeState = writeResult.waitUntilFinish();
       PipelineResult readResult = runRead();
-      readResult.waitUntilFinish();
+      PipelineResult.State readState = readResult.waitUntilFinish();
       gatherAndPublishMetrics(writeResult, readResult);
+      // Fail the test if pipeline failed.
+      assertNotEquals(writeState, PipelineResult.State.FAILED);
+      assertNotEquals(readState, PipelineResult.State.FAILED);
     } finally {
       DatabaseTestHelper.deleteTable(dataSource, tableName);
     }
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 0a0afddde90..787e86d9791 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
 
 import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 
 import com.google.cloud.Timestamp;
@@ -32,7 +33,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.io.Read;
@@ -196,7 +196,7 @@ public class KafkaIOIT {
         .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
 
     PipelineResult writeResult = writePipeline.run();
-    writeResult.waitUntilFinish();
+    PipelineResult.State writeState = writeResult.waitUntilFinish();
 
     PipelineResult readResult = readPipeline.run();
     PipelineResult.State readState =
@@ -212,6 +212,9 @@ public class KafkaIOIT {
       Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
       IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
     }
+    // Fail the test if pipeline failed.
+    assertNotEquals(writeState, PipelineResult.State.FAILED);
+    assertNotEquals(readState, PipelineResult.State.FAILED);
   }
 
   @Test
@@ -247,6 +250,8 @@ public class KafkaIOIT {
         readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
 
     cancelIfTimeouted(readResult, readState);
+    // Fail the test if pipeline failed.
+    assertNotEquals(readState, PipelineResult.State.FAILED);
 
     if (!options.isWithTestcontainers()) {
       Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
@@ -459,11 +464,12 @@ public class KafkaIOIT {
 
       PipelineResult readResult = sdfReadPipeline.run();
 
-      State readState =
+      PipelineResult.State readState =
           readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout() / 2));
 
       cancelIfTimeouted(readResult, readState);
-
+      // Fail the test if pipeline failed.
+      assertNotEquals(readState, PipelineResult.State.FAILED);
     } finally {
       client.deleteTopics(ImmutableSet.of(topicName));
     }
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
index 6b696a7c332..b585cca7688 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.mongodb;
 
 import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
 import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+import static org.junit.Assert.assertNotEquals;
 
 import com.google.cloud.Timestamp;
 import com.mongodb.client.MongoClient;
@@ -167,7 +168,7 @@ public class MongoDBIOIT {
                 .withDatabase(options.getMongoDBDatabaseName())
                 .withCollection(collection));
     PipelineResult writeResult = writePipeline.run();
-    writeResult.waitUntilFinish();
+    PipelineResult.State writeState = writeResult.waitUntilFinish();
 
     finalCollectionSize = getCollectionSizeInBytes(collection);
 
@@ -187,8 +188,12 @@ public class MongoDBIOIT {
     PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
 
     PipelineResult readResult = readPipeline.run();
-    readResult.waitUntilFinish();
+    PipelineResult.State readState = readResult.waitUntilFinish();
     collectAndPublishMetrics(writeResult, readResult);
+
+    // Fail the test if pipeline failed.
+    assertNotEquals(writeState, PipelineResult.State.FAILED);
+    assertNotEquals(readState, PipelineResult.State.FAILED);
   }
 
   private double getCollectionSizeInBytes(final String collectionName) {