You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/09/09 20:32:38 UTC
[beam] branch master updated: Assert pipeline results in performance tests (#23027)
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 07439a0ab8a Assert pipeline results in performance tests (#23027)
07439a0ab8a is described below
commit 07439a0ab8a4f62d4f549722d3f0db20bc36c255
Author: Yi Hu <ya...@google.com>
AuthorDate: Fri Sep 9 16:32:30 2022 -0400
Assert pipeline results in performance tests (#23027)
* Assert pipeline results in performance tests
* Fix possible false possitive test status
* Remove duplicate exact same kafka IO streaming test and re-arrange tests
---
.../jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 29 +++-------------------
.../provider/bigquery/BigQueryIOPushDownIT.java | 13 +++++++---
.../beam/sdk/bigqueryioperftests/BigQueryIOIT.java | 10 ++++++--
.../java/org/apache/beam/sdk/io/cdap/CdapIOIT.java | 5 +++-
.../java/org/apache/beam/sdk/io/avro/AvroIOIT.java | 5 +++-
.../apache/beam/sdk/io/parquet/ParquetIOIT.java | 5 +++-
.../java/org/apache/beam/sdk/io/text/TextIOIT.java | 5 +++-
.../apache/beam/sdk/io/tfrecord/TFRecordIOIT.java | 8 ++++--
.../java/org/apache/beam/sdk/io/xml/XmlIOIT.java | 5 +++-
.../sdk/io/hadoop/format/HadoopFormatIOIT.java | 8 ++++--
.../java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java | 8 ++++--
.../org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 14 ++++++++---
.../apache/beam/sdk/io/mongodb/MongoDBIOIT.java | 9 +++++--
13 files changed, 76 insertions(+), 48 deletions(-)
diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
index 6d5efaa6de2..40d117ca3e7 100644
--- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
@@ -94,21 +94,6 @@ job(jobName) {
// We are using a smaller number of records for streaming test since streaming read is much slower
// than batch read.
- Map dataflowRunnerV2SdfWrapperPipelineOptions = pipelineOptions + [
- sourceOptions : """
- {
- "numRecords": "100000",
- "keySizeBytes": "1",
- "valueSizeBytes": "90"
- }
- """.trim().replaceAll("\\s", ""),
- kafkaTopic : 'beam-runnerv2',
- bigQueryTable : 'kafkaioit_results_sdf_wrapper',
- influxMeasurement : 'kafkaioit_results_sdf_wrapper',
- // TODO(https://github.com/apache/beam/issues/20806) remove shuffle_mode=appliance with runner v2 once issue is resolved.
- experiments : 'use_runner_v2,shuffle_mode=appliance,use_unified_worker',
- ]
-
Map dataflowRunnerV2SdfPipelineOptions = pipelineOptions + [
sourceOptions : """
{
@@ -129,15 +114,7 @@ job(jobName) {
rootBuildScriptDir(common.checkoutDir)
common.setGradleSwitches(delegate)
switches("--info")
- switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(pipelineOptions)}\'")
- switches("-DintegrationTestRunner=dataflow")
- tasks(":sdks:java:io:kafka:integrationTest --tests org.apache.beam.sdk.io.kafka.KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInBatch")
- }
- gradle {
- rootBuildScriptDir(common.checkoutDir)
- common.setGradleSwitches(delegate)
- switches("--info")
- switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(dataflowRunnerV2SdfWrapperPipelineOptions)}\'")
+ switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(dataflowRunnerV2SdfPipelineOptions)}\'")
switches("-DintegrationTestRunner=dataflow")
tasks(":sdks:java:io:kafka:integrationTest --tests org.apache.beam.sdk.io.kafka.KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInStreaming")
}
@@ -145,9 +122,9 @@ job(jobName) {
rootBuildScriptDir(common.checkoutDir)
common.setGradleSwitches(delegate)
switches("--info")
- switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(dataflowRunnerV2SdfPipelineOptions)}\'")
+ switches("-DintegrationTestPipelineOptions=\'${common.joinOptionsWithNestedJsonValues(pipelineOptions)}\'")
switches("-DintegrationTestRunner=dataflow")
- tasks(":sdks:java:io:kafka:integrationTest --tests org.apache.beam.sdk.io.kafka.KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInStreaming")
+ tasks(":sdks:java:io:kafka:integrationTest --tests org.apache.beam.sdk.io.kafka.KafkaIOIT.testKafkaIOReadsAndWritesCorrectlyInBatch")
}
}
}
diff --git a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
index a81711ab643..34842fa8c8d 100644
--- a/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
+++ b/sdks/java/extensions/sql/perf-tests/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryIOPushDownIT.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
import static org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets.getRuleSets;
+import static org.junit.Assert.assertNotEquals;
import com.google.cloud.Timestamp;
import java.util.ArrayList;
@@ -123,8 +124,10 @@ public class BigQueryIOPushDownIT {
.apply(ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC)));
PipelineResult result = pipeline.run();
- result.waitUntilFinish();
+ PipelineResult.State pipelineState = result.waitUntilFinish();
collectAndPublishMetrics(result, "_directread_pushdown");
+ // Fail the test if pipeline failed.
+ assertNotEquals(pipelineState, PipelineResult.State.FAILED);
}
@Test
@@ -151,8 +154,10 @@ public class BigQueryIOPushDownIT {
.apply(ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC)));
PipelineResult result = pipeline.run();
- result.waitUntilFinish();
+ PipelineResult.State pipelineState = result.waitUntilFinish();
collectAndPublishMetrics(result, "_directread");
+ // Fail the test if pipeline failed.
+ assertNotEquals(pipelineState, PipelineResult.State.FAILED);
}
@Test
@@ -164,8 +169,10 @@ public class BigQueryIOPushDownIT {
.apply(ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC)));
PipelineResult result = pipeline.run();
- result.waitUntilFinish();
+ PipelineResult.State pipelineState = result.waitUntilFinish();
collectAndPublishMetrics(result, "_default");
+ // Fail the test if pipeline failed.
+ assertNotEquals(pipelineState, PipelineResult.State.FAILED);
}
private void collectAndPublishMetrics(PipelineResult readResult, String postfix) {
diff --git a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
index cc5dffe7bf1..c67dc936705 100644
--- a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
+++ b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.bigqueryioperftests;
+import static org.junit.Assert.assertNotEquals;
+
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
@@ -183,8 +185,10 @@ public class BigQueryIOIT {
new TableFieldSchema().setName("data").setType("BYTES")))));
PipelineResult pipelineResult = pipeline.run();
- pipelineResult.waitUntilFinish();
+ PipelineResult.State pipelineState = pipelineResult.waitUntilFinish();
extractAndPublishTime(pipelineResult, metricName);
+ // Fail the test if pipeline failed.
+ assertNotEquals(pipelineState, PipelineResult.State.FAILED);
}
private void testRead() {
@@ -193,8 +197,10 @@ public class BigQueryIOIT {
.apply("Read from BQ", BigQueryIO.readTableRows().from(tableQualifier))
.apply("Gather time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)));
PipelineResult result = pipeline.run();
- result.waitUntilFinish();
+ PipelineResult.State pipelineState = result.waitUntilFinish();
extractAndPublishTime(result, READ_TIME_METRIC_NAME);
+ // Fail the test if pipeline failed.
+ assertNotEquals(pipelineState, PipelineResult.State.FAILED);
}
private void extractAndPublishTime(PipelineResult pipelineResult, String writeTimeMetricName) {
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java
index 1b30ba712f4..bb5f205fc51 100644
--- a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/CdapIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.cdap;
import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
+import static org.junit.Assert.assertNotEquals;
import com.google.cloud.Timestamp;
import io.cdap.plugin.common.Constants;
@@ -147,11 +148,13 @@ public class CdapIOIT {
PAssert.thatSingleton(consolidatedHashcode).isEqualTo(getExpectedHashForRowCount(numberOfRows));
PipelineResult readResult = readPipeline.run();
- readResult.waitUntilFinish();
+ PipelineResult.State readState = readResult.waitUntilFinish();
if (!options.isWithTestcontainers()) {
collectAndPublishMetrics(writeResult, readResult);
}
+ // Fail the test if pipeline failed.
+ assertNotEquals(readState, PipelineResult.State.FAILED);
}
private CdapIO.Write<TestRowDBWritable, NullWritable> writeToDB(Map<String, Object> params) {
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
index f9f6cb84583..832c69af1c4 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/avro/AvroIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.avro;
import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
+import static org.junit.Assert.assertNotEquals;
import com.google.cloud.Timestamp;
import java.util.HashSet;
@@ -158,8 +159,10 @@ public class AvroIOIT {
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
PipelineResult result = pipeline.run();
- result.waitUntilFinish();
+ PipelineResult.State pipelineState = result.waitUntilFinish();
collectAndPublishMetrics(result);
+ // Fail the test if pipeline failed.
+ assertNotEquals(pipelineState, PipelineResult.State.FAILED);
}
private void collectAndPublishMetrics(PipelineResult result) {
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
index 8fb39b4bbaa..7db703b7b02 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.parquet;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
+import static org.junit.Assert.assertNotEquals;
import com.google.cloud.Timestamp;
import java.util.HashSet;
@@ -164,8 +165,10 @@ public class ParquetIOIT {
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
PipelineResult result = pipeline.run();
- result.waitUntilFinish();
+ PipelineResult.State pipelineState = result.waitUntilFinish();
collectAndPublishMetrics(result);
+ // Fail the test if pipeline failed.
+ assertNotEquals(pipelineState, PipelineResult.State.FAILED);
}
private void collectAndPublishMetrics(PipelineResult result) {
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
index ecf2f97d94a..e6965d1f20a 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.text;
import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
+import static org.junit.Assert.assertNotEquals;
import com.google.cloud.Timestamp;
import java.util.HashSet;
@@ -150,9 +151,11 @@ public class TextIOIT {
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
PipelineResult result = pipeline.run();
- result.waitUntilFinish();
+ PipelineResult.State pipelineState = result.waitUntilFinish();
collectAndPublishMetrics(result);
+ // Fail the test if pipeline failed.
+ assertNotEquals(pipelineState, PipelineResult.State.FAILED);
}
private void collectAndPublishMetrics(PipelineResult result) {
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
index 38e4c8c9c89..cde7b010af7 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.tfrecord;
import static org.apache.beam.sdk.io.Compression.AUTO;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.readFileBasedIOITPipelineOptions;
+import static org.junit.Assert.assertNotEquals;
import com.google.cloud.Timestamp;
import java.nio.charset.StandardCharsets;
@@ -139,7 +140,7 @@ public class TFRecordIOIT {
.apply("Write content to files", writeTransform);
final PipelineResult writeResult = writePipeline.run();
- writeResult.waitUntilFinish();
+ PipelineResult.State writeState = writeResult.waitUntilFinish();
String filenamePattern = createFilenamePattern();
PCollection<String> consolidatedHashcode =
@@ -161,8 +162,11 @@ public class TFRecordIOIT {
ParDo.of(new DeleteFileFn())
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
final PipelineResult readResult = readPipeline.run();
- readResult.waitUntilFinish();
+ PipelineResult.State readState = readResult.waitUntilFinish();
collectAndPublishMetrics(writeResult, readResult);
+ // Fail the test if pipeline failed.
+ assertNotEquals(writeState, PipelineResult.State.FAILED);
+ assertNotEquals(readState, PipelineResult.State.FAILED);
}
private void collectAndPublishMetrics(
diff --git a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
index bd4d2549c15..515f9180089 100644
--- a/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
+++ b/sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/xml/XmlIOIT.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.xml;
import static org.apache.beam.sdk.io.common.FileBasedIOITHelper.appendTimestampSuffix;
import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.junit.Assert.assertNotEquals;
import com.google.cloud.Timestamp;
import java.io.Serializable;
@@ -171,8 +172,10 @@ public class XmlIOIT {
.withSideInputs(consolidatedHashcode.apply(View.asSingleton())));
PipelineResult result = pipeline.run();
- result.waitUntilFinish();
+ PipelineResult.State pipelineState = result.waitUntilFinish();
collectAndPublishResults(result);
+ // Fail the test if pipeline failed.
+ assertNotEquals(pipelineState, PipelineResult.State.FAILED);
}
private void collectAndPublishResults(PipelineResult result) {
diff --git a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
index 8973f586996..3c3c4398b94 100644
--- a/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
+++ b/sdks/java/io/hadoop-format/src/test/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.hadoop.format;
import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import static org.apache.beam.sdk.io.common.TestRow.getExpectedHashForRowCount;
+import static org.junit.Assert.assertNotEquals;
import com.google.cloud.Timestamp;
import java.sql.SQLException;
@@ -207,7 +208,7 @@ public class HadoopFormatIOIT {
new HDFSSynchronization(tmpFolder.getRoot().getAbsolutePath())));
PipelineResult writeResult = writePipeline.run();
- writeResult.waitUntilFinish();
+ PipelineResult.State writeState = writeResult.waitUntilFinish();
PCollection<String> consolidatedHashcode =
readPipeline
@@ -223,11 +224,14 @@ public class HadoopFormatIOIT {
PAssert.thatSingleton(consolidatedHashcode).isEqualTo(getExpectedHashForRowCount(numberOfRows));
PipelineResult readResult = readPipeline.run();
- readResult.waitUntilFinish();
+ PipelineResult.State readState = readResult.waitUntilFinish();
if (!options.isWithTestcontainers()) {
collectAndPublishMetrics(writeResult, readResult);
}
+ // Fail the test if pipeline failed.
+ assertNotEquals(writeState, PipelineResult.State.FAILED);
+ assertNotEquals(readState, PipelineResult.State.FAILED);
}
private void collectAndPublishMetrics(PipelineResult writeResult, PipelineResult readResult) {
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
index e08f7be5c4f..2ecdde7626e 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.io.common.DatabaseTestHelper.assertRowCount;
import static org.apache.beam.sdk.io.common.DatabaseTestHelper.getTestDataToWrite;
import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+import static org.junit.Assert.assertNotEquals;
import com.google.cloud.Timestamp;
import java.sql.SQLException;
@@ -135,10 +136,13 @@ public class JdbcIOIT {
DatabaseTestHelper.createTable(dataSource, tableName);
try {
PipelineResult writeResult = runWrite();
- writeResult.waitUntilFinish();
+ PipelineResult.State writeState = writeResult.waitUntilFinish();
PipelineResult readResult = runRead();
- readResult.waitUntilFinish();
+ PipelineResult.State readState = readResult.waitUntilFinish();
gatherAndPublishMetrics(writeResult, readResult);
+ // Fail the test if pipeline failed.
+ assertNotEquals(writeState, PipelineResult.State.FAILED);
+ assertNotEquals(readState, PipelineResult.State.FAILED);
} finally {
DatabaseTestHelper.deleteTable(dataSource, tableName);
}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 0a0afddde90..787e86d9791 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka;
import static org.apache.beam.sdk.io.synthetic.SyntheticOptions.fromJsonString;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import com.google.cloud.Timestamp;
@@ -32,7 +33,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.io.Read;
@@ -196,7 +196,7 @@ public class KafkaIOIT {
.apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));
PipelineResult writeResult = writePipeline.run();
- writeResult.waitUntilFinish();
+ PipelineResult.State writeState = writeResult.waitUntilFinish();
PipelineResult readResult = readPipeline.run();
PipelineResult.State readState =
@@ -212,6 +212,9 @@ public class KafkaIOIT {
Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
IOITMetrics.publishToInflux(TEST_ID, TIMESTAMP, metrics, settings);
}
+ // Fail the test if pipeline failed.
+ assertNotEquals(writeState, PipelineResult.State.FAILED);
+ assertNotEquals(readState, PipelineResult.State.FAILED);
}
@Test
@@ -247,6 +250,8 @@ public class KafkaIOIT {
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout()));
cancelIfTimeouted(readResult, readState);
+ // Fail the test if pipeline failed.
+ assertNotEquals(readState, PipelineResult.State.FAILED);
if (!options.isWithTestcontainers()) {
Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);
@@ -459,11 +464,12 @@ public class KafkaIOIT {
PipelineResult readResult = sdfReadPipeline.run();
- State readState =
+ PipelineResult.State readState =
readResult.waitUntilFinish(Duration.standardSeconds(options.getReadTimeout() / 2));
cancelIfTimeouted(readResult, readState);
-
+ // Fail the test if pipeline failed.
+ assertNotEquals(readState, PipelineResult.State.FAILED);
} finally {
client.deleteTopics(ImmutableSet.of(topicName));
}
diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
index 6b696a7c332..b585cca7688 100644
--- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
+++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBIOIT.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.mongodb;
import static org.apache.beam.sdk.io.common.IOITHelper.executeWithRetry;
import static org.apache.beam.sdk.io.common.IOITHelper.getHashForRecordCount;
+import static org.junit.Assert.assertNotEquals;
import com.google.cloud.Timestamp;
import com.mongodb.client.MongoClient;
@@ -167,7 +168,7 @@ public class MongoDBIOIT {
.withDatabase(options.getMongoDBDatabaseName())
.withCollection(collection));
PipelineResult writeResult = writePipeline.run();
- writeResult.waitUntilFinish();
+ PipelineResult.State writeState = writeResult.waitUntilFinish();
finalCollectionSize = getCollectionSizeInBytes(collection);
@@ -187,8 +188,12 @@ public class MongoDBIOIT {
PAssert.thatSingleton(consolidatedHashcode).isEqualTo(expectedHash);
PipelineResult readResult = readPipeline.run();
- readResult.waitUntilFinish();
+ PipelineResult.State readState = readResult.waitUntilFinish();
collectAndPublishMetrics(writeResult, readResult);
+
+ // Fail the test if pipeline failed.
+ assertNotEquals(writeState, PipelineResult.State.FAILED);
+ assertNotEquals(readState, PipelineResult.State.FAILED);
}
private double getCollectionSizeInBytes(final String collectionName) {