You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/07/07 22:18:54 UTC
[beam] branch master updated: Fix testKafkaIOReadsAndWritesCorrectlyInStreaming failing for kafka performance test
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ed118a168bc Fix testKafkaIOReadsAndWritesCorrectlyInStreaming failing for kafka performance test
new 692bbb1f6d4 Merge pull request #22186 from Fix testKafkaIOReadsAndWritesCorrectlyInStreaming failing for kafka performance test
ed118a168bc is described below
commit ed118a168bc39735aca3e18e6b6cb668db817577
Author: Benjamin Gonzalez <be...@wizeline.com>
AuthorDate: Thu Jul 7 11:41:50 2022 -0500
Fix testKafkaIOReadsAndWritesCorrectlyInStreaming failing for kafka performance test
---
.../kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 211fa6f6137..d38560667f3 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -170,12 +170,12 @@ public class KafkaIOIT {
writePipeline
.apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions)))
.apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
- .apply("Write to Kafka", writeToKafka());
+ .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic()));
// Use streaming pipeline to read Kafka records.
readPipeline.getOptions().as(Options.class).setStreaming(true);
readPipeline
- .apply("Read from unbounded Kafka", readFromKafka())
+ .apply("Read from unbounded Kafka", readFromKafka().withTopic(options.getKafkaTopic()))
.apply("Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
.apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings()))
.apply("Counting element", ParDo.of(new CountingFn(NAMESPACE, READ_ELEMENT_METRIC_NAME)));