You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/17 05:33:53 UTC
kafka git commit: KAFKA-4885: Add client.close as exception handler
in streams system tests
Repository: kafka
Updated Branches:
refs/heads/trunk 1659ca177 -> b8fe2bb56
KAFKA-4885: Add client.close as exception handler in streams system tests
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Eno Thereska, Damian Guy, Jason Gustafson
Closes #2693 from guozhangwang/K4885-system-test-unexpected-exception-handler
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8fe2bb5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8fe2bb5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8fe2bb5
Branch: refs/heads/trunk
Commit: b8fe2bb56c25715de1602c1f00ab720af085f2e5
Parents: 1659ca1
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Mar 16 22:33:50 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 16 22:33:50 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/perf/SimpleBenchmark.java | 27 +++++++++++++++-----
.../streams/tests/BrokerCompatibilityTest.java | 9 +++++++
.../kafka/streams/tests/SmokeTestClient.java | 12 ++++++++-
.../kafka/streams/tests/StreamsSmokeTest.java | 7 -----
4 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8fe2bb5/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7a36d70..9947870 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -54,6 +54,7 @@ import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.Properties;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
/**
* Class that provides support for a series of benchmarks. It is usually driven by
@@ -592,7 +593,7 @@ public class SimpleBenchmark {
}
});
- return new KafkaStreams(builder, props);
+ return createKafkaStreamsWithExceptionHandler(builder, props);
}
private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch) {
@@ -631,7 +632,7 @@ public class SimpleBenchmark {
}
});
- return new KafkaStreams(builder, props);
+ return createKafkaStreamsWithExceptionHandler(builder, props);
}
private class CountDownAction<V> implements ForeachAction<Integer, V> {
@@ -664,7 +665,7 @@ public class SimpleBenchmark {
input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
- return new KafkaStreams(builder, streamConfig);
+ return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
}
private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1,
@@ -676,7 +677,7 @@ public class SimpleBenchmark {
input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
- return new KafkaStreams(builder, streamConfig);
+ return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
}
private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1,
@@ -689,7 +690,7 @@ public class SimpleBenchmark {
input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(timeDifferenceMs)).foreach(new CountDownAction(latch));
- return new KafkaStreams(builder, streamConfig);
+ return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
}
private KafkaStreams createKafkaStreamsWithStateStore(String topic,
@@ -739,9 +740,23 @@ public class SimpleBenchmark {
}
}, "store");
- return new KafkaStreams(builder, props);
+ return createKafkaStreamsWithExceptionHandler(builder, props);
}
+ private KafkaStreams createKafkaStreamsWithExceptionHandler(final KStreamBuilder builder, final Properties props) {
+ final KafkaStreams streamsClient = new KafkaStreams(builder, props);
+ streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+
+ streamsClient.close(30, TimeUnit.SECONDS);
+ }
+ });
+
+ return streamsClient;
+ }
+
private double megabytesPerSec(long time, long processedBytes) {
return (processedBytes / 1024.0 / 1024.0) / (time / 1000.0);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8fe2bb5/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 6d99e52..64d3f1a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.test.TestUtils;
import java.io.File;
import java.util.Collections;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
public class BrokerCompatibilityTest {
@@ -63,6 +64,14 @@ public class BrokerCompatibilityTest {
builder.stream(SOURCE_TOPIC).to(SINK_TOPIC);
final KafkaStreams streams = new KafkaStreams(builder, streamsProperties);
+ streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+
+ streams.close(30, TimeUnit.SECONDS);
+ }
+ });
System.out.println("start Kafka Streams");
streams.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8fe2bb5/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 5e6a8b3..7691948 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -210,7 +210,17 @@ public class SmokeTestClient extends SmokeTestUtil {
"cntByCnt"
).to(stringSerde, longSerde, "tagg");
- return new KafkaStreams(builder, props);
+ final KafkaStreams streamsClient = new KafkaStreams(builder, props);
+ streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+
+ streamsClient.close(30, TimeUnit.SECONDS);
+ }
+ });
+
+ return streamsClient;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8fe2bb5/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index d24ad4f..244aa8e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -53,13 +53,6 @@ public class StreamsSmokeTest {
// this starts a KafkaStreams client
final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka);
client.start();
-
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- client.close();
- }
- });
break;
case "close-deadlock-test":
final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka);