You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/09/15 15:02:28 UTC

[beam] branch master updated: Test fix Kafka Performance test batch (#23191)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6911520a516 Test fix Kafka Performance test batch (#23191)
6911520a516 is described below

commit 6911520a5165f26a6966a54dd369e07764e6334c
Author: Yi Hu <ya...@google.com>
AuthorDate: Thu Sep 15 11:02:18 2022 -0400

    Test fix Kafka Performance test batch (#23191)
    
    * Fix topic not exist
    
    * Tune test parameters to avoid time out
---
 .test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy          | 5 +++--
 .test-infra/kubernetes/kafka-cluster/05-kafka/configmap-config.yaml | 4 ++--
 .../kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java | 6 +++---
 3 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
index 40d117ca3e7..a39a9097421 100644
--- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy
@@ -86,8 +86,8 @@ job(jobName) {
     influxHost                   : InfluxDBCredentialsHelper.InfluxDBHostUrl,
     kafkaBootstrapServerAddresses: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1," +
     "\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", //KAFKA_BROKER_ represents IP and KAFKA_SERVICE_ port of outside services
-    kafkaTopic                   : 'beam',
-    readTimeout                  : '900',
+    kafkaTopic                   : 'beam-batch',
+    readTimeout                  : '1800',
     numWorkers                   : '5',
     autoscalingAlgorithm         : 'NONE'
   ]
@@ -103,6 +103,7 @@ job(jobName) {
                                      }
                                    """.trim().replaceAll("\\s", ""),
     kafkaTopic                   : 'beam-sdf',
+    readTimeout                  : '900',
     bigQueryTable                : 'kafkaioit_results_runner_v2',
     influxMeasurement            : 'kafkaioit_results_runner_v2',
     // TODO(https://github.com/apache/beam/issues/20806) remove shuffle_mode=appliance with runner v2 once issue is resolved.
diff --git a/.test-infra/kubernetes/kafka-cluster/05-kafka/configmap-config.yaml b/.test-infra/kubernetes/kafka-cluster/05-kafka/configmap-config.yaml
index bac8292c834..de19d0b4e3f 100644
--- a/.test-infra/kubernetes/kafka-cluster/05-kafka/configmap-config.yaml
+++ b/.test-infra/kubernetes/kafka-cluster/05-kafka/configmap-config.yaml
@@ -35,7 +35,7 @@ data:
     echo "Applying runtime configuration using confluentinc/cp-kafka:5.0.1"
     kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --force --topic beam --partitions 1 --replication-factor 3
     kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name beam --describe
-    kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --force --topic beam-runnerv2 --partitions 1 --replication-factor 3
-    kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name beam-runnerv2 --describe
+    kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --force --topic beam-batch --partitions 1 --replication-factor 3
+    kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name beam-batch --describe
     kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --force --topic beam-sdf --partitions 1 --replication-factor 3
     kafka-configs --zookeeper zookeeper:2181 --entity-type topics --entity-name beam-sdf --describe
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
index 787e86d9791..d2a441d8e58 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java
@@ -228,13 +228,13 @@ public class KafkaIOIT {
     writePipeline
         .apply("Generate records", Read.from(new SyntheticBoundedSource(sourceOptions)))
         .apply("Measure write time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
-        .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic() + "-batch"));
+        .apply("Write to Kafka", writeToKafka().withTopic(options.getKafkaTopic()));
 
     PCollection<String> hashcode =
         readPipeline
             .apply(
                 "Read from bounded Kafka",
-                readFromBoundedKafka().withTopic(options.getKafkaTopic() + "-batch"))
+                readFromBoundedKafka().withTopic(options.getKafkaTopic()))
             .apply(
                 "Measure read time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)))
             .apply("Map records to strings", MapElements.via(new MapKafkaRecordsToStrings()))
@@ -251,7 +251,7 @@ public class KafkaIOIT {
 
     cancelIfTimeouted(readResult, readState);
     // Fail the test if pipeline failed.
-    assertNotEquals(readState, PipelineResult.State.FAILED);
+    assertEquals(readState, PipelineResult.State.DONE);
 
     if (!options.isWithTestcontainers()) {
       Set<NamedTestResult> metrics = readMetrics(writeResult, readResult);