You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/17 05:33:53 UTC

kafka git commit: KAFKA-4885: Add client.close as exception handler in streams system tests

Repository: kafka
Updated Branches:
  refs/heads/trunk 1659ca177 -> b8fe2bb56


KAFKA-4885: Add client.close as exception handler in streams system tests

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Eno Thereska, Damian Guy, Jason Gustafson

Closes #2693 from guozhangwang/K4885-system-test-unexpected-exception-handler


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8fe2bb5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8fe2bb5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8fe2bb5

Branch: refs/heads/trunk
Commit: b8fe2bb56c25715de1602c1f00ab720af085f2e5
Parents: 1659ca1
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Mar 16 22:33:50 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 16 22:33:50 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/perf/SimpleBenchmark.java     | 27 +++++++++++++++-----
 .../streams/tests/BrokerCompatibilityTest.java  |  9 +++++++
 .../kafka/streams/tests/SmokeTestClient.java    | 12 ++++++++-
 .../kafka/streams/tests/StreamsSmokeTest.java   |  7 -----
 4 files changed, 41 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b8fe2bb5/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7a36d70..9947870 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -54,6 +54,7 @@ import java.util.Locale;
 import java.util.concurrent.CountDownLatch;
 import java.util.Properties;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Class that provides support for a series of benchmarks. It is usually driven by
@@ -592,7 +593,7 @@ public class SimpleBenchmark {
             }
         });
 
-        return new KafkaStreams(builder, props);
+        return createKafkaStreamsWithExceptionHandler(builder, props);
     }
 
     private KafkaStreams createKafkaStreamsWithSink(String topic, final CountDownLatch latch) {
@@ -631,7 +632,7 @@ public class SimpleBenchmark {
             }
         });
 
-        return new KafkaStreams(builder, props);
+        return createKafkaStreamsWithExceptionHandler(builder, props);
     }
 
     private class CountDownAction<V> implements ForeachAction<Integer, V> {
@@ -664,7 +665,7 @@ public class SimpleBenchmark {
 
         input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
 
-        return new KafkaStreams(builder, streamConfig);
+        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
     }
 
     private KafkaStreams createKafkaStreamsKTableKTableJoin(Properties streamConfig, String kTableTopic1,
@@ -676,7 +677,7 @@ public class SimpleBenchmark {
 
         input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
 
-        return new KafkaStreams(builder, streamConfig);
+        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
     }
 
     private KafkaStreams createKafkaStreamsKStreamKStreamJoin(Properties streamConfig, String kStreamTopic1,
@@ -689,7 +690,7 @@ public class SimpleBenchmark {
 
         input1.leftJoin(input2, VALUE_JOINER, JoinWindows.of(timeDifferenceMs)).foreach(new CountDownAction(latch));
 
-        return new KafkaStreams(builder, streamConfig);
+        return createKafkaStreamsWithExceptionHandler(builder, streamConfig);
     }
 
     private KafkaStreams createKafkaStreamsWithStateStore(String topic,
@@ -739,9 +740,23 @@ public class SimpleBenchmark {
             }
         }, "store");
 
-        return new KafkaStreams(builder, props);
+        return createKafkaStreamsWithExceptionHandler(builder, props);
     }
 
+    private KafkaStreams createKafkaStreamsWithExceptionHandler(final KStreamBuilder builder, final Properties props) {
+        final KafkaStreams streamsClient = new KafkaStreams(builder, props);
+        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+
+                streamsClient.close(30, TimeUnit.SECONDS);
+            }
+        });
+
+        return streamsClient;
+    }
+    
     private double megabytesPerSec(long time, long processedBytes) {
         return  (processedBytes / 1024.0 / 1024.0) / (time / 1000.0);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8fe2bb5/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 6d99e52..64d3f1a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.test.TestUtils;
 import java.io.File;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 public class BrokerCompatibilityTest {
 
@@ -63,6 +64,14 @@ public class BrokerCompatibilityTest {
         builder.stream(SOURCE_TOPIC).to(SINK_TOPIC);
 
         final KafkaStreams streams = new KafkaStreams(builder, streamsProperties);
+        streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+
+                streams.close(30, TimeUnit.SECONDS);
+            }
+        });
         System.out.println("start Kafka Streams");
         streams.start();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8fe2bb5/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 5e6a8b3..7691948 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -210,7 +210,17 @@ public class SmokeTestClient extends SmokeTestUtil {
                     "cntByCnt"
         ).to(stringSerde, longSerde, "tagg");
 
-        return new KafkaStreams(builder, props);
+        final KafkaStreams streamsClient = new KafkaStreams(builder, props);
+        streamsClient.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                System.out.println("FATAL: An unexpected exception is encountered on thread " + t + ": " + e);
+                
+                streamsClient.close(30, TimeUnit.SECONDS);
+            }
+        });
+
+        return streamsClient;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8fe2bb5/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
index d24ad4f..244aa8e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -53,13 +53,6 @@ public class StreamsSmokeTest {
                 // this starts a KafkaStreams client
                 final SmokeTestClient client = new SmokeTestClient(new File(stateDir), kafka);
                 client.start();
-
-                Runtime.getRuntime().addShutdownHook(new Thread() {
-                    @Override
-                    public void run() {
-                        client.close();
-                    }
-                });
                 break;
             case "close-deadlock-test":
                 final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka);