You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/07/07 22:18:54 UTC

[beam] branch master updated: Fix testKafkaIOReadsAndWritesCorrectlyInStreaming failing for kafka performance test

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ed118a168bc Fix testKafkaIOReadsAndWritesCorrectlyInStreaming failing for kafka performance test
     new 692bbb1f6d4 Merge pull request #22186 from Fix testKafkaIOReadsAndWritesCorrectlyInStreaming failing for kafka performance test
ed118a168bc is described below

commit ed118a168bc39735aca3e18e6b6cb668db817577
Author: Benjamin Gonzalez <be...@wizeline.com>
AuthorDate: Thu Jul 7 11:41:50 2022 -0500

    Fix testKafkaIOReadsAndWritesCorrectlyInStreaming failing for kafka performance test
---
 .../kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 211fa6f6137..d38560667f3 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -170,12 +170,12 @@ public class KafkaIOIT {
     writePipeline
         .apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions)))
         .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
-        .apply("Write to Kafka", writeToKafka());
+        .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic()));
 
     // Use streaming pipeline to read Kafka records.
     readPipeline.getOptions().as(Options.class).setStreaming(true);
     readPipeline
-        .apply("Read from unbounded Kafka", readFromKafka())
+        .apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic()))
         .apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
         .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings()))
         .apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));