You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/09/15 15:02:28 UTC
[beam] branch master updated: Test fix Kafka Performance test batch (#23191)
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6911520a516 Test fix Kafka Performance test batch (#23191)
6911520a516 is described below
commit 6911520a5165f26a6966a54dd369e07764e6334c
Author: Yi Hu <ya...@google.com>
AuthorDate: Thu Sep 15 11:02:18 2022 -0400
Test fix Kafka Performance test batch (#23191)
* Fix topic not exist
* Tune test parameters to avoid time out
---
.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 5 +++--
.test-infra/kubernetes/kafka-cluster/05-kafka/configmap-config.yaml | 4 ++--
.../kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 6 +++---
3 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
index 40d117ca3e7..a39a9097421 100644
--- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
@@ -86,8 +86,8 @@ job(jobName) {
influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl,
kafkaBootstrapServerAddresses: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1," +
"\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", //KAFKA_BROKER_ represents IP and KAFKA_SERVICE_ port of outside services
- kafkaTopic : 'beam',
- readTimeout : '900',
+ kafkaTopic : 'beam-batch',
+ readTimeout : '1800',
numWorkers : '5',
autoscalingAlgorithm : 'NONE'
]
@@ -103,6 +103,7 @@ job(jobName) {
}
""".trim().replaceAll("\\s", ""),
kafkaTopic : 'beam-sdf',
+ readTimeout : '900',
bigQueryTable : 'kafkaioit_results_runner_v2',
influxMeasurement : 'kafkaioit_results_runner_v2',
// TODO(https://github.com/apache/beam/issues/20806) remove shuffle_mode=appliance with runner v2 once issue is resolved.
diff --git a/.test-infra/kubernetes/kafka-cluster/05-kafka/configmap-config.yaml b/.test-infra/kubernetes/kafka-cluster/05-kafka/configmap-config.yaml
index bac8292c834..de19d0b4e3f 100644
--- a/.test-infra/kubernetes/kafka-cluster/05-kafka/configmap-config.yaml
+++ b/.test-infra/kubernetes/kafka-cluster/05-kafka/configmap-config.yaml
@@ -35,7 +35,7 @@ data:
echo "Applying runtime configuration using confluentinc/cp-kafka:5.0.1"
kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --force --topic beam --partitions 1 --replication-factor 3
kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name beam --describe
- kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --force --topic beam-runnerv2 --partitions 1 --replication-factor 3
- kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name beam-runnerv2 --describe
+ kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --force --topic beam-batch --partitions 1 --replication-factor 3
+ kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name beam-batch --describe
kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --force --topic beam-sdf --partitions 1 --replication-factor 3
kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name beam-sdf --describe
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 787e86d9791..d2a441d8e58 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -228,13 +228,13 @@ public class KafkaIOIT {
writePipeline
.apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions)))
.apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
- .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic() + "-batch"));
+ .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic()));
PCollection<String> hashcode =
readPipeline
.apply(
"Read from bounded Kafka",
- readFromBoundedKafka().withTopic(options.getKafkaTopic() + "-batch"))
+ readFromBoundedKafka().withTopic(options.getKafkaTopic()))
.apply(
"Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
.apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings()))
@@ -251,7 +251,7 @@ public class KafkaIOIT {
cancelIfTimeouted(readResult, readState);
// Fail the test if pipeline failed.
- assertNotEquals(readState, PipelineResult.State.FAILED);
+ assertEquals(readState, PipelineResult.State.DONE);
if (!options.isWithTestcontainers()) {
Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);