You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/10 17:03:05 UTC
kafka git commit: MINOR: fix EOS test race condition
Repository: kafka
Updated Branches:
refs/heads/trunk 29c46ddb9 -> 64930cd71
MINOR: fix EOS test race condition
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Damian Guy <da...@gmail.com>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #4048 from mjsax/fix-eos-test-race-condition
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/64930cd7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/64930cd7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/64930cd7
Branch: refs/heads/trunk
Commit: 64930cd71361853cc46ed02232213cc7ba749e77
Parents: 29c46dd
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Tue Oct 10 10:03:02 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 10 10:03:02 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/tests/EosTestClient.java | 25 +++++++++++++++++++-
1 file changed, 24 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/64930cd7/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
index 098b77b..5e85bd2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.KStream;
import java.io.File;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
public class EosTestClient extends SmokeTestUtil {
@@ -35,6 +36,7 @@ public class EosTestClient extends SmokeTestUtil {
private final String kafka;
private final File stateDir;
private final boolean withRepartitioning;
+ private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
private KafkaStreams streams;
private boolean uncaughtException;
@@ -46,7 +48,7 @@ public class EosTestClient extends SmokeTestUtil {
this.withRepartitioning = withRepartitioning;
}
- private boolean isRunning = true;
+ private volatile boolean isRunning = true;
public void start() {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@@ -54,12 +56,18 @@ public class EosTestClient extends SmokeTestUtil {
public void run() {
isRunning = false;
streams.close(TimeUnit.SECONDS.toMillis(300), TimeUnit.SECONDS);
+
+ // need to wait for callback to avoid race condition
+ // -> make sure the callback printout to stdout is there as it is expected test output
+ waitForStateTransitionCallback();
+
// do not remove these printouts since they are needed for health scripts
if (!uncaughtException) {
System.out.println(System.currentTimeMillis());
System.out.println("EOS-TEST-CLIENT-CLOSED");
System.out.flush();
}
+
}
}));
@@ -85,6 +93,9 @@ public class EosTestClient extends SmokeTestUtil {
System.out.println(System.currentTimeMillis());
System.out.println("StateChange: " + oldState + " -> " + newState);
System.out.flush();
+ if (newState == KafkaStreams.State.NOT_RUNNING) {
+ notRunningCallbackReceived.set(true);
+ }
}
});
streams.start();
@@ -195,4 +206,16 @@ public class EosTestClient extends SmokeTestUtil {
return new KafkaStreams(builder.build(), props);
}
+ private void waitForStateTransitionCallback() {
+ final long maxWaitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(300);
+ while (!notRunningCallbackReceived.get() && System.currentTimeMillis() < maxWaitTime) {
+ try {
+ Thread.sleep(500);
+ } catch (final InterruptedException ignoreAndSwallow) { /* just keep waiting */ }
+ }
+ if (!notRunningCallbackReceived.get()) {
+ System.err.println("State transition callback to NOT_RUNNING never received. Timed out after 5 minutes.");
+ System.err.flush();
+ }
+ }
}