You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/01/01 02:45:13 UTC

[kafka] branch trunk updated: HOTFIX: fix system test race condition (#7836)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3453e9e  HOTFIX: fix system test race condition (#7836)
3453e9e is described below

commit 3453e9e2eee1400901ef8e1965d657b825d5d64a
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Dec 31 21:44:31 2019 -0500

    HOTFIX: fix system test race condition (#7836)
    
    In some system tests a Streams app is started and then prints a message to stdout, which the system test waits for to confirm the node has successfully been brought up. It then greps for certain log messages in a retriable loop.
    
    But waiting on the Streams app to start/print to stdout does not mean the log file has been created yet, so the grep may return an error. Although this occurs in a retriable loop it is assumed that grep will not fail, and the result is piped to wc and then blindly converted to an int in the python function, which fails since the error message is a string (throws ValueError)
    
    We should catch the ValueError and return a 0 so it can try again rather than immediately crash
    
    Reviewers: Bill Bejeck <bb...@gmail.com>, John Roesler <vv...@users.noreply.github.com>, Guozhang Wang <wa...@gmail.com>
---
 .../apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java | 2 +-
 tests/kafkatest/tests/streams/base_streams_test.py                  | 6 +++++-
 .../kafkatest/tests/streams/streams_broker_down_resilience_test.py  | 2 +-
 3 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
index b605f46..ac4e120 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java
@@ -97,7 +97,7 @@ public class StreamsBrokerDownResilienceTest {
                 public void apply(final String key, final String value) {
                     System.out.println("received key " + key + " and value " + value);
                     messagesProcessed++;
-                    System.out.println("processed" + messagesProcessed + "messages");
+                    System.out.println("processed " + messagesProcessed + " messages");
                     System.out.flush();
                 }
             }).to(SINK_TOPIC);
diff --git a/tests/kafkatest/tests/streams/base_streams_test.py b/tests/kafkatest/tests/streams/base_streams_test.py
index 53e4231..256693c 100644
--- a/tests/kafkatest/tests/streams/base_streams_test.py
+++ b/tests/kafkatest/tests/streams/base_streams_test.py
@@ -98,5 +98,9 @@ class BaseStreamsTest(KafkaTest):
     @staticmethod
     def verify_from_file(processor, message, file):
         result = processor.node.account.ssh_output("grep -E '%s' %s | wc -l" % (message, file), allow_fail=False)
-        return int(result)
+        try:
+          return int(result)
+        except ValueError:
+          self.logger.warn("Command failed with ValueError: " + result)
+          return 0
 
diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
index ee5feae..58f3b18 100644
--- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
@@ -28,7 +28,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
     outputTopic = "streamsResilienceSink"
     client_id = "streams-broker-resilience-verify-consumer"
     num_messages = 10000
-    message = "processed[0-9]*messages"
+    message = "processed [0-9]* messages"
     connected_message = "Discovered group coordinator"
 
     def __init__(self, test_context):