You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/08/20 13:19:59 UTC

[kafka] branch 2.5 updated: KAFKA-10308: Fix flaky core/round_trip_fault_test.py (#9079)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 8bde3d4  KAFKA-10308: Fix flaky core/round_trip_fault_test.py (#9079)
8bde3d4 is described below

commit 8bde3d476f23a9d9dd06b3360281a3a3fca2804e
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Tue Aug 4 21:53:10 2020 +0800

    KAFKA-10308: Fix flaky core/round_trip_fault_test.py (#9079)
    
    Creating a topic may fail (due to timeout) in running system tests. However, `RoundTripWorker` does not ignore `TopicExistsException` which makes `round_trip_fault_test.py` be a flaky one.
    
    More specifically, a network exception can cause the `CreateTopics` request to reach Kafka but Trogdor retry it
    and hit a `TopicAlreadyExists` exception on the retry, failing the test.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java   | 3 ++-
 .../main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java   | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
index faf2d96..812129e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -259,7 +260,7 @@ public final class WorkerUtils {
             // map will always contain the topic since all topics in 'topicsExists' are in given
             // 'topics' map
             int partitions = topicsInfo.get(desc.name()).numPartitions();
-            if (desc.partitions().size() != partitions) {
+            if (partitions != CreateTopicsRequest.NO_NUM_PARTITIONS && desc.partitions().size() != partitions) {
                 String str = "Topic '" + desc.name() + "' exists, but has "
                              + desc.partitions().size() + " partitions, while requested "
                              + " number of partitions is " + partitions;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 643d22c..643555a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -147,7 +147,7 @@ public class RoundTripWorker implements TaskWorker {
                 }
                 status.update(new TextNode("Creating " + newTopics.keySet().size() + " topic(s)"));
                 WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(),
-                    spec.adminClientConf(), newTopics, true);
+                    spec.adminClientConf(), newTopics, false);
                 status.update(new TextNode("Created " + newTopics.keySet().size() + " topic(s)"));
                 toSendTracker = new ToSendTracker(spec.maxMessages());
                 toReceiveTracker = new ToReceiveTracker();