You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/10 17:03:05 UTC

kafka git commit: MINOR: fix EOS test race condition

Repository: kafka
Updated Branches:
  refs/heads/trunk 29c46ddb9 -> 64930cd71


MINOR: fix EOS test race condition

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #4048 from mjsax/fix-eos-test-race-condition


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64930cd7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64930cd7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64930cd7

Branch: refs/heads/trunk
Commit: 64930cd71361853cc46ed02232213cc7ba749e77
Parents: 29c46dd
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Oct 10 10:03:02 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 10 10:03:02 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/tests/EosTestClient.java      | 25 +++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/64930cd7/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 098b77b..5e85bd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import java.io.File;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class EosTestClient extends SmokeTestUtil {
 
@@ -35,6 +36,7 @@ public class EosTestClient extends SmokeTestUtil {
     private final String kafka;
     private final File stateDir;
     private final boolean withRepartitioning;
+    private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
 
     private KafkaStreams streams;
     private boolean uncaughtException;
@@ -46,7 +48,7 @@ public class EosTestClient extends SmokeTestUtil {
         this.withRepartitioning = withRepartitioning;
     }
 
-    private boolean isRunning = true;
+    private volatile boolean isRunning = true;
 
     public void start() {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@@ -54,12 +56,18 @@ public class EosTestClient extends SmokeTestUtil {
             public void run() {
                 isRunning = false;
                 streams.close(TimeUnit.SECONDS.toMillis(300), TimeUnit.SECONDS);
+
+                // need to wait for callback to avoid race condition
+                // -> make sure the callback printout to stdout is there as it is expected test output
+                waitForStateTransitionCallback();
+
                 // do not remove these printouts since they are needed for health scripts
                 if (!uncaughtException) {
                     System.out.println(System.currentTimeMillis());
                     System.out.println("EOS-TEST-CLIENT-CLOSED");
                     System.out.flush();
                 }
+
             }
         }));
 
@@ -85,6 +93,9 @@ public class EosTestClient extends SmokeTestUtil {
                         System.out.println(System.currentTimeMillis());
                         System.out.println("StateChange: " + oldState + " -> " + newState);
                         System.out.flush();
+                        if (newState == KafkaStreams.State.NOT_RUNNING) {
+                            notRunningCallbackReceived.set(true);
+                        }
                     }
                 });
                 streams.start();
@@ -195,4 +206,16 @@ public class EosTestClient extends SmokeTestUtil {
         return new KafkaStreams(builder.build(), props);
     }
 
+    private void waitForStateTransitionCallback() {
+        final long maxWaitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(300);
+        while (!notRunningCallbackReceived.get() && System.currentTimeMillis() < maxWaitTime) {
+            try {
+                Thread.sleep(500);
+            } catch (final InterruptedException ignoreAndSwallow) { /* just keep waiting */ }
+        }
+        if (!notRunningCallbackReceived.get()) {
+            System.err.println("State transition callback to NOT_RUNNING never received. Timed out after 5 minutes.");
+            System.err.flush();
+        }
+    }
 }