You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/11 08:42:21 UTC

[4/5] kafka git commit: KAFKA-5531; throw concrete exceptions in streams tests

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 9204b88..105dd2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -63,56 +63,56 @@ public class KGroupedTableImplTest {
     }
 
     @Test
-    public void shouldAllowNullStoreNameOnAggregate() throws Exception {
+    public void shouldAllowNullStoreNameOnAggregate() {
         groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, (String) null);
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void shouldNotAllowInvalidStoreNameOnAggregate() throws Exception {
+    public void shouldNotAllowInvalidStoreNameOnAggregate() {
         groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, INVALID_STORE_NAME);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullInitializerOnAggregate() throws Exception {
+    public void shouldNotAllowNullInitializerOnAggregate() {
         groupedTable.aggregate(null, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullAdderOnAggregate() throws Exception {
+    public void shouldNotAllowNullAdderOnAggregate() {
         groupedTable.aggregate(MockInitializer.STRING_INIT, null, MockAggregator.TOSTRING_REMOVER, "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullSubtractorOnAggregate() throws Exception {
+    public void shouldNotAllowNullSubtractorOnAggregate() {
         groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullAdderOnReduce() throws Exception {
+    public void shouldNotAllowNullAdderOnReduce() {
         groupedTable.reduce(null, MockReducer.STRING_REMOVER, "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullSubtractorOnReduce() throws Exception {
+    public void shouldNotAllowNullSubtractorOnReduce() {
         groupedTable.reduce(MockReducer.STRING_ADDER, null, "store");
     }
 
     @Test
-    public void shouldAllowNullStoreNameOnReduce() throws Exception {
+    public void shouldAllowNullStoreNameOnReduce() {
         groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (String) null);
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void shouldNotAllowInvalidStoreNameOnReduce() throws Exception {
+    public void shouldNotAllowInvalidStoreNameOnReduce() {
         groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, INVALID_STORE_NAME);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullStoreSupplierOnReduce() throws Exception {
+    public void shouldNotAllowNullStoreSupplierOnReduce() {
         groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (StateStoreSupplier<KeyValueStore>) null);
     }
 
-    private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) throws Exception {
+    private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) {
         final Map<String, Integer> results = new HashMap<>();
         reduced.foreach(new ForeachAction<String, Integer>() {
             @Override
@@ -141,7 +141,7 @@ public class KGroupedTableImplTest {
     }
 
     @Test
-    public void shouldReduce() throws Exception {
+    public void shouldReduce() {
         final String topic = "input";
         final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
             new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
@@ -160,7 +160,7 @@ public class KGroupedTableImplTest {
     }
 
     @Test
-    public void shouldReduceWithInternalStoreName() throws Exception {
+    public void shouldReduceWithInternalStoreName() {
         final String topic = "input";
         final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
             new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 32c21fe..5e8687f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -260,157 +260,157 @@ public class KStreamImplTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullPredicateOnFilter() throws Exception {
+    public void shouldNotAllowNullPredicateOnFilter() {
         testStream.filter(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullPredicateOnFilterNot() throws Exception {
+    public void shouldNotAllowNullPredicateOnFilterNot() {
         testStream.filterNot(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullMapperOnSelectKey() throws Exception {
+    public void shouldNotAllowNullMapperOnSelectKey() {
         testStream.selectKey(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullMapperOnMap() throws Exception {
+    public void shouldNotAllowNullMapperOnMap() {
         testStream.map(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullMapperOnMapValues() throws Exception {
+    public void shouldNotAllowNullMapperOnMapValues() {
         testStream.mapValues(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullFilePathOnWriteAsText() throws Exception {
+    public void shouldNotAllowNullFilePathOnWriteAsText() {
         testStream.writeAsText(null);
     }
 
     @Test(expected = TopologyException.class)
-    public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
+    public void shouldNotAllowEmptyFilePathOnWriteAsText() {
         testStream.writeAsText("\t    \t");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullMapperOnFlatMap() throws Exception {
+    public void shouldNotAllowNullMapperOnFlatMap() {
         testStream.flatMap(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullMapperOnFlatMapValues() throws Exception {
+    public void shouldNotAllowNullMapperOnFlatMapValues() {
         testStream.flatMapValues(null);
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void shouldHaveAtLeastOnPredicateWhenBranching() throws Exception {
+    public void shouldHaveAtLeastOnPredicateWhenBranching() {
         testStream.branch();
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldCantHaveNullPredicate() throws Exception {
+    public void shouldCantHaveNullPredicate() {
         testStream.branch((Predicate) null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTopicOnThrough() throws Exception {
+    public void shouldNotAllowNullTopicOnThrough() {
         testStream.through(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTopicOnTo() throws Exception {
+    public void shouldNotAllowNullTopicOnTo() {
         testStream.to(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTransformSupplierOnTransform() throws Exception {
+    public void shouldNotAllowNullTransformSupplierOnTransform() {
         testStream.transform(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTransformSupplierOnTransformValues() throws Exception {
+    public void shouldNotAllowNullTransformSupplierOnTransformValues() {
         testStream.transformValues(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullProcessSupplier() throws Exception {
+    public void shouldNotAllowNullProcessSupplier() {
         testStream.process(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullOtherStreamOnJoin() throws Exception {
+    public void shouldNotAllowNullOtherStreamOnJoin() {
         testStream.join(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10));
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullValueJoinerOnJoin() throws Exception {
+    public void shouldNotAllowNullValueJoinerOnJoin() {
         testStream.join(testStream, null, JoinWindows.of(10));
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullJoinWindowsOnJoin() throws Exception {
+    public void shouldNotAllowNullJoinWindowsOnJoin() {
         testStream.join(testStream, MockValueJoiner.TOSTRING_JOINER, null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTableOnTableJoin() throws Exception {
+    public void shouldNotAllowNullTableOnTableJoin() {
         testStream.leftJoin((KTable) null, MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullValueMapperOnTableJoin() throws Exception {
+    public void shouldNotAllowNullValueMapperOnTableJoin() {
         testStream.leftJoin(builder.table(Serdes.String(), Serdes.String(), "topic", "store"), null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullSelectorOnGroupBy() throws Exception {
+    public void shouldNotAllowNullSelectorOnGroupBy() {
         testStream.groupBy(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullActionOnForEach() throws Exception {
+    public void shouldNotAllowNullActionOnForEach() {
         testStream.foreach(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTableOnJoinWithGlobalTable() throws Exception {
+    public void shouldNotAllowNullTableOnJoinWithGlobalTable() {
         testStream.join((GlobalKTable) null,
                         MockKeyValueMapper.<String, String>SelectValueMapper(),
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullMapperOnJoinWithGlobalTable() throws Exception {
+    public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
         testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
                         null,
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() throws Exception {
+    public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
         testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
                         MockKeyValueMapper.<String, String>SelectValueMapper(),
                         null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() throws Exception {
+    public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() {
         testStream.leftJoin((GlobalKTable) null,
                         MockKeyValueMapper.<String, String>SelectValueMapper(),
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() throws Exception {
+    public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
         testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
                         null,
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() throws Exception {
+    public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
         testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
                         MockKeyValueMapper.<String, String>SelectValueMapper(),
                         null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index efb9f12..572c0b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -62,7 +62,7 @@ public class KStreamKStreamJoinTest {
     }
 
     @Test
-    public void testJoin() throws Exception {
+    public void testJoin() {
         StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -163,7 +163,7 @@ public class KStreamKStreamJoinTest {
     }
 
     @Test
-    public void testOuterJoin() throws Exception {
+    public void testOuterJoin() {
         StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -264,7 +264,7 @@ public class KStreamKStreamJoinTest {
     }
 
     @Test
-    public void testWindowing() throws Exception {
+    public void testWindowing() {
         long time = 0L;
 
         StreamsBuilder builder = new StreamsBuilder();
@@ -494,7 +494,7 @@ public class KStreamKStreamJoinTest {
     }
 
     @Test
-    public void testAsymetricWindowingAfter() throws Exception {
+    public void testAsymetricWindowingAfter() {
         long time = 1000L;
 
         StreamsBuilder builder = new StreamsBuilder();
@@ -608,7 +608,7 @@ public class KStreamKStreamJoinTest {
     }
 
     @Test
-    public void testAsymetricWindowingBefore() throws Exception {
+    public void testAsymetricWindowingBefore() {
         long time = 1000L;
 
         StreamsBuilder builder = new StreamsBuilder();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index c56956d..745ab4e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -62,7 +62,7 @@ public class KStreamKStreamLeftJoinTest {
     }
 
     @Test
-    public void testLeftJoin() throws Exception {
+    public void testLeftJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -153,7 +153,7 @@ public class KStreamKStreamLeftJoinTest {
     }
 
     @Test
-    public void testWindowing() throws Exception {
+    public void testWindowing() {
         final StreamsBuilder builder = new StreamsBuilder();
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
         long time = 0L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 524cf42..d206b02 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -57,7 +57,7 @@ public class KStreamKTableJoinTest {
     }
 
     @Test
-    public void testJoin() throws Exception {
+    public void testJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 9e9fddc..a79184e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -58,7 +58,7 @@ public class KStreamKTableLeftJoinTest {
     }
 
     @Test
-    public void testJoin() throws Exception {
+    public void testJoin() {
         StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 0662944..7f0d28a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -118,7 +118,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     @Test
-    public void shouldCreateSingleSessionWhenWithinGap() throws Exception {
+    public void shouldCreateSingleSessionWhenWithinGap() {
         context.setTime(0);
         processor.process("john", "first");
         context.setTime(500);
@@ -131,7 +131,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
 
     @Test
-    public void shouldMergeSessions() throws Exception {
+    public void shouldMergeSessions() {
         context.setTime(0);
         final String sessionId = "mel";
         processor.process(sessionId, "first");
@@ -155,7 +155,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     @Test
-    public void shouldUpdateSessionIfTheSameTime() throws Exception {
+    public void shouldUpdateSessionIfTheSameTime() {
         context.setTime(0);
         processor.process("mel", "first");
         processor.process("mel", "second");
@@ -165,7 +165,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     @Test
-    public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() throws Exception {
+    public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
         final String sessionId = "mel";
         long time = 0;
         context.setTime(time);
@@ -190,7 +190,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
 
     @Test
-    public void shouldRemoveMergedSessionsFromStateStore() throws Exception {
+    public void shouldRemoveMergedSessionsFromStateStore() {
         context.setTime(0);
         processor.process("a", "1");
 
@@ -208,7 +208,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     @Test
-    public void shouldHandleMultipleSessionsAndMerging() throws Exception {
+    public void shouldHandleMultipleSessionsAndMerging() {
         context.setTime(0);
         processor.process("a", "1");
         processor.process("b", "1");
@@ -238,7 +238,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
 
     @Test
-    public void shouldGetAggregatedValuesFromValueGetter() throws Exception {
+    public void shouldGetAggregatedValuesFromValueGetter() {
         final KTableValueGetter<Windowed<String>, Long> getter = sessionAggregator.view().get();
         getter.init(context);
         context.setTime(0);
@@ -253,7 +253,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     @Test
-    public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() throws Exception {
+    public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
         initStore(false);
         processor.init(context);
 
@@ -268,7 +268,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
     }
 
     @Test
-    public void shouldImmediatelyForwardRemovedSessionsWhenMerging() throws Exception {
+    public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
         initStore(false);
         processor.init(context);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index b39ac36..aa660e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -49,14 +49,14 @@ public class KStreamWindowAggregateTest {
     private File stateDir = null;
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
-    
+
     @Before
     public void setUp() throws IOException {
         stateDir = TestUtils.tempDirectory("kafka-test");
     }
 
     @Test
-    public void testAggBasic() throws Exception {
+    public void testAggBasic() {
         final StreamsBuilder builder = new StreamsBuilder();
         String topic1 = "topic1";
 
@@ -147,7 +147,7 @@ public class KStreamWindowAggregateTest {
     }
 
     @Test
-    public void testJoin() throws Exception {
+    public void testJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
         String topic1 = "topic1";
         String topic2 = "topic2";

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 781eb61..9cdc782 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -67,7 +67,7 @@ public class KTableKTableLeftJoinTest {
     }
 
     @Test
-    public void testJoin() throws Exception {
+    public void testJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -161,7 +161,7 @@ public class KTableKTableLeftJoinTest {
     }
 
     @Test
-    public void testNotSendingOldValue() throws Exception {
+    public void testNotSendingOldValue() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -242,7 +242,7 @@ public class KTableKTableLeftJoinTest {
     }
 
     @Test
-    public void testSendingOldValue() throws Exception {
+    public void testSendingOldValue() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -330,7 +330,7 @@ public class KTableKTableLeftJoinTest {
      * Before the fix this would trigger an IllegalStateException.
      */
     @Test
-    public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() throws Exception {
+    public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() {
         final String agg = "agg";
         final String tableOne = "tableOne";
         final String tableTwo = "tableTwo";

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 7ab6d87..368a3ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -61,7 +61,7 @@ public class KTableKTableOuterJoinTest {
     }
 
     @Test
-    public void testJoin() throws Exception {
+    public void testJoin() {
         StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -166,7 +166,7 @@ public class KTableKTableOuterJoinTest {
     }
 
     @Test
-    public void testNotSendingOldValue() throws Exception {
+    public void testNotSendingOldValue() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -254,7 +254,7 @@ public class KTableKTableOuterJoinTest {
     }
 
     @Test
-    public void testSendingOldValue() throws Exception {
+    public void testSendingOldValue() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
index aca3352..59371d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
@@ -39,66 +39,66 @@ public class SessionKeySerdeTest {
     final private SessionKeySerde<String> sessionKeySerde = new SessionKeySerde<>(serde);
 
     @Test
-    public void shouldSerializeDeserialize() throws Exception {
+    public void shouldSerializeDeserialize() {
         final byte[] bytes = sessionKeySerde.serializer().serialize(topic, windowedKey);
         final Windowed<String> result = sessionKeySerde.deserializer().deserialize(topic, bytes);
         assertEquals(windowedKey, result);
     }
 
     @Test
-    public void shouldSerializeNullToNull() throws Exception {
+    public void shouldSerializeNullToNull() {
         assertNull(sessionKeySerde.serializer().serialize(topic, null));
     }
 
     @Test
-    public void shouldDeSerializeEmtpyByteArrayToNull() throws Exception {
+    public void shouldDeSerializeEmtpyByteArrayToNull() {
         assertNull(sessionKeySerde.deserializer().deserialize(topic, new byte[0]));
     }
 
     @Test
-    public void shouldDeSerializeNullToNull() throws Exception {
+    public void shouldDeSerializeNullToNull() {
         assertNull(sessionKeySerde.deserializer().deserialize(topic, null));
     }
 
     @Test
-    public void shouldConvertToBinaryAndBack() throws Exception {
+    public void shouldConvertToBinaryAndBack() {
         final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
         final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer(), "dummy");
         assertEquals(windowedKey, result);
     }
 
     @Test
-    public void shouldExtractEndTimeFromBinary() throws Exception {
+    public void shouldExtractEndTimeFromBinary() {
         final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
         assertEquals(endTime, SessionKeySerde.extractEnd(serialized.get()));
     }
 
     @Test
-    public void shouldExtractStartTimeFromBinary() throws Exception {
+    public void shouldExtractStartTimeFromBinary() {
         final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
         assertEquals(startTime, SessionKeySerde.extractStart(serialized.get()));
     }
 
     @Test
-    public void shouldExtractWindowFromBindary() throws Exception {
+    public void shouldExtractWindowFromBindary() {
         final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
         assertEquals(window, SessionKeySerde.extractWindow(serialized.get()));
     }
 
     @Test
-    public void shouldExtractKeyBytesFromBinary() throws Exception {
+    public void shouldExtractKeyBytesFromBinary() {
         final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
         assertArrayEquals(key.getBytes(), SessionKeySerde.extractKeyBytes(serialized.get()));
     }
 
     @Test
-    public void shouldExtractKeyFromBinary() throws Exception {
+    public void shouldExtractKeyFromBinary() {
         final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
         assertEquals(windowedKey, SessionKeySerde.from(serialized.get(), serde.deserializer(), "dummy"));
     }
 
     @Test
-    public void shouldExtractBytesKeyFromBinary() throws Exception {
+    public void shouldExtractBytesKeyFromBinary() {
         final Bytes bytesKey = Bytes.wrap(key.getBytes());
         final Windowed<Bytes> windowedBytesKey = new Windowed<>(bytesKey, window);
         final Bytes serialized = SessionKeySerde.bytesToBinary(windowedBytesKey);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 9fb8480..3cbceeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -129,7 +129,7 @@ public class SimpleBenchmark {
         this.numThreads = numThreads;
     }
 
-    private void run() throws Exception {
+    private void run() {
         switch (testName) {
             case ALL_TESTS:
                 // producer performance
@@ -187,12 +187,12 @@ public class SimpleBenchmark {
                 yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
                 break;
             default:
-                throw new Exception("Unknown test name " + testName);
+                throw new RuntimeException("Unknown test name " + testName);
 
         }
     }
 
-    public static void main(String[] args) throws Exception {
+    public static void main(String[] args) {
         String kafka = args.length > 0 ? args[0] : "localhost:9092";
         String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
         int numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
@@ -254,7 +254,7 @@ public class SimpleBenchmark {
     }
 
     private boolean maybeSetupPhase(final String topic, final String clientId,
-                                    final boolean skipIfAllTests) throws Exception {
+                                    final boolean skipIfAllTests) {
         resetStats();
         // initialize topics
         if (loadPhase) {
@@ -291,7 +291,7 @@ public class SimpleBenchmark {
     }
 
 
-    private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) throws Exception {
+    private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) {
         YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
 
         benchmark.run();
@@ -304,7 +304,7 @@ public class SimpleBenchmark {
      * @param countTopic Topic where numbers are stored
      * @throws Exception
      */
-    public void count(String countTopic) throws Exception {
+    public void count(String countTopic) {
         if (maybeSetupPhase(countTopic, "simple-benchmark-produce-count", false)) {
             return;
         }
@@ -319,7 +319,7 @@ public class SimpleBenchmark {
      * Measure the performance of a KStream-KTable left join. The setup is such that each
      * KStream record joins to exactly one element in the KTable
      */
-    public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) throws Exception {
+    public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) {
         if (maybeSetupPhase(kStreamTopic, "simple-benchmark-produce-kstream", false)) {
             maybeSetupPhase(kTableTopic, "simple-benchmark-produce-ktable", false);
             return;
@@ -339,7 +339,7 @@ public class SimpleBenchmark {
      * Measure the performance of a KStream-KStream left join. The setup is such that each
      * KStream record joins to exactly one element in the other KStream
      */
-    public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) throws Exception {
+    public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) {
         if (maybeSetupPhase(kStreamTopic1, "simple-benchmark-produce-kstream-topic1", false)) {
             maybeSetupPhase(kStreamTopic2, "simple-benchmark-produce-kstream-topic2", false);
             return;
@@ -359,7 +359,7 @@ public class SimpleBenchmark {
      * Measure the performance of a KTable-KTable left join. The setup is such that each
      * KTable record joins to exactly one element in the other KTable
      */
-    public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) throws Exception {
+    public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) {
         if (maybeSetupPhase(kTableTopic1, "simple-benchmark-produce-ktable-topic1", false)) {
             maybeSetupPhase(kTableTopic2, "simple-benchmark-produce-ktable-topic2", false);
             return;
@@ -400,7 +400,7 @@ public class SimpleBenchmark {
         streams.close();
     }
 
-    private long startStreamsThread(final KafkaStreams streams, final CountDownLatch latch) throws Exception {
+    private long startStreamsThread(final KafkaStreams streams, final CountDownLatch latch) {
         Thread thread = new Thread() {
             public void run() {
                 streams.start();
@@ -430,7 +430,7 @@ public class SimpleBenchmark {
         return endTime - startTime;
     }
 
-    public void processStream(final String topic) throws Exception {
+    public void processStream(final String topic) {
         if (maybeSetupPhase(topic, "simple-benchmark-process-stream-load", true)) {
             return;
         }
@@ -443,7 +443,7 @@ public class SimpleBenchmark {
         printResults("Streams Performance [records/latency/rec-sec/MB-sec source]: ", latency);
     }
 
-    public void processStreamWithSink(String topic) throws Exception {
+    public void processStreamWithSink(String topic) {
         if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-sink-load", true)) {
             return;
         }
@@ -456,7 +456,7 @@ public class SimpleBenchmark {
 
     }
 
-    public void processStreamWithStateStore(String topic) throws Exception {
+    public void processStreamWithStateStore(String topic) {
         if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-state-store-load", true)) {
             return;
         }
@@ -468,7 +468,7 @@ public class SimpleBenchmark {
 
     }
 
-    public void processStreamWithCachedStateStore(String topic) throws Exception {
+    public void processStreamWithCachedStateStore(String topic) {
         if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-cached-state-store-load", true)) {
             return;
         }
@@ -479,7 +479,7 @@ public class SimpleBenchmark {
         printResults("Streams Performance [records/latency/rec-sec/MB-sec source+cache+store]: ", latency);
     }
 
-    public void produce(String topic) throws Exception {
+    public void produce(String topic) {
         // loading phase does not make sense for producer
         if (loadPhase) {
             resetStats();
@@ -499,11 +499,11 @@ public class SimpleBenchmark {
      *                   when this produce step is part of another benchmark that produces its own stats
      */
     private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
-                         int upperRange, boolean printStats) throws Exception {
+                         int upperRange, boolean printStats) {
 
 
         if (sequential) {
-            if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords");
+            if (upperRange < numRecords) throw new IllegalArgumentException("UpperRange must be >= numRecords");
         }
         if (!sequential) {
             System.out.println("WARNING: You are using non-sequential keys. If your tests' exit logic expects to see a final key, random keys may not work.");
@@ -539,7 +539,7 @@ public class SimpleBenchmark {
         }
     }
 
-    public void consume(String topic) throws Exception {
+    public void consume(String topic) {
         if (maybeSetupPhase(topic, "simple-benchmark-consumer-load", true)) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index d2d0681..0a10f9f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -89,7 +89,7 @@ public class YahooBenchmark {
     private boolean maybeSetupPhaseCampaigns(final String topic, final String clientId,
                                              final boolean skipIfAllTests,
                                              final int numCampaigns, final int adsPerCampaign,
-                                             final List<String> ads) throws Exception {
+                                             final List<String> ads) {
         parent.resetStats();
         // initialize topics
         if (parent.loadPhase) {
@@ -128,7 +128,7 @@ public class YahooBenchmark {
     // just for Yahoo benchmark
     private boolean maybeSetupPhaseEvents(final String topic, final String clientId,
                                           final boolean skipIfAllTests, final int numRecords,
-                                          final List<String> ads) throws Exception {
+                                          final List<String> ads) {
         parent.resetStats();
         String[] eventTypes = new String[]{"view", "click", "purchase"};
         Random rand = new Random();
@@ -181,7 +181,7 @@ public class YahooBenchmark {
     }
 
 
-    public void run() throws Exception {
+    public void run() {
         int numCampaigns = 100;
         int adsPerCampaign = 10;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 0cd2f2a..ded2732 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -457,19 +457,19 @@ public class TopologyBuilderTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingSink() throws Exception {
+    public void shouldNotAllowNullNameWhenAddingSink() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSink(null, "topic");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullTopicWhenAddingSink() throws Exception {
+    public void shouldNotAllowNullTopicWhenAddingSink() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSink("name", null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception {
+    public void shouldNotAllowNullNameWhenAddingProcessor() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addProcessor(null, new ProcessorSupplier() {
             @Override
@@ -480,37 +480,37 @@ public class TopologyBuilderTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullProcessorSupplier() throws Exception {
+    public void shouldNotAllowNullProcessorSupplier() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addProcessor("name", null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullNameWhenAddingSource() throws Exception {
+    public void shouldNotAllowNullNameWhenAddingSource() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource(null, Pattern.compile(".*"));
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws Exception {
+    public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.connectProcessorAndStateStores(null, "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAddNullInternalTopic() throws Exception {
+    public void shouldNotAddNullInternalTopic() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addInternalTopic(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotSetApplicationIdToNull() throws Exception {
+    public void shouldNotSetApplicationIdToNull() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAddNullStateStoreSupplier() throws Exception {
+    public void shouldNotAddNullStateStoreSupplier() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addStateStore(null);
     }
@@ -524,7 +524,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception {
+    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source", "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
@@ -535,7 +535,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception {
+    public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("source", "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
@@ -546,7 +546,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception {
+    public void shouldCorrectlyMapStateStoreToInternalTopics() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId("appId");
         builder.addInternalTopic("internal-topic");
@@ -560,7 +560,7 @@ public class TopologyBuilderTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() throws Exception {
+    public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId("appId");
         builder.addSource("source", "topic");
@@ -580,7 +580,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception {
+    public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId("appId");
         builder.addSource("source", "topic");
@@ -596,7 +596,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws Exception {
+    public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.setApplicationId("appId");
         builder.addInternalTopic("foo");
@@ -701,7 +701,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldAddTimestampExtractorPerSource() throws Exception {
+    public void shouldAddTimestampExtractorPerSource() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource(new MockTimestampExtractor(), "source", "topic");
         final ProcessorTopology processorTopology = builder.build(null);
@@ -709,7 +709,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldAddTimestampExtractorWithOffsetResetPerSource() throws Exception {
+    public void shouldAddTimestampExtractorWithOffsetResetPerSource() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource(null, new MockTimestampExtractor(), "source", "topic");
         final ProcessorTopology processorTopology = builder.build(null);
@@ -717,7 +717,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception {
+    public void shouldAddTimestampExtractorWithPatternPerSource() {
         final TopologyBuilder builder = new TopologyBuilder();
         final Pattern pattern = Pattern.compile("t.*");
         builder.addSource(new MockTimestampExtractor(), "source", pattern);
@@ -726,7 +726,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() throws Exception {
+    public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() {
         final TopologyBuilder builder = new TopologyBuilder();
         final Pattern pattern = Pattern.compile("t.*");
         builder.addSource(null, new MockTimestampExtractor(), "source", pattern);
@@ -735,7 +735,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() throws Exception {
+    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic");
         final ProcessorTopology processorTopology = builder.build(null);
@@ -743,7 +743,7 @@ public class TopologyBuilderTest {
     }
 
     @Test
-    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() throws Exception {
+    public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() {
         final TopologyBuilder builder = new TopologyBuilder();
         final Pattern pattern = Pattern.compile("t.*");
         builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern);
@@ -751,6 +751,7 @@ public class TopologyBuilderTest {
         assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
+    @Test
     public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
 
         final TopologyBuilder topologyBuilder = new TopologyBuilder()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 72082da..16e5b39 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -50,7 +50,7 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnRegisterWhenContextIsInitialized() throws Exception {
+    public void shouldThrowIllegalStateExceptionOnRegisterWhenContextIsInitialized() {
         context.initialized();
         try {
             context.register(stateStore, false, null);
@@ -61,7 +61,7 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldNotThrowIllegalStateExceptionOnRegisterWhenContextIsNotInitialized() throws Exception {
+    public void shouldNotThrowIllegalStateExceptionOnRegisterWhenContextIsNotInitialized() {
         context.register(stateStore, false, null);
     }
 
@@ -71,7 +71,7 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() throws Exception {
+    public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() {
         context.setRecordContext(null);
         try {
             context.topic();
@@ -82,18 +82,18 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldReturnTopicFromRecordContext() throws Exception {
+    public void shouldReturnTopicFromRecordContext() {
         assertThat(context.topic(), equalTo(recordContext.topic()));
     }
 
     @Test
-    public void shouldReturnNullIfTopicEqualsNonExistTopic() throws Exception {
+    public void shouldReturnNullIfTopicEqualsNonExistTopic() {
         context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
         assertThat(context.topic(), nullValue());
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnPartitionIfNoRecordContext() throws Exception {
+    public void shouldThrowIllegalStateExceptionOnPartitionIfNoRecordContext() {
         context.setRecordContext(null);
         try {
             context.partition();
@@ -104,12 +104,12 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldReturnPartitionFromRecordContext() throws Exception {
+    public void shouldReturnPartitionFromRecordContext() {
         assertThat(context.partition(), equalTo(recordContext.partition()));
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnOffsetIfNoRecordContext() throws Exception {
+    public void shouldThrowIllegalStateExceptionOnOffsetIfNoRecordContext() {
         context.setRecordContext(null);
         try {
             context.offset();
@@ -119,12 +119,12 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldReturnOffsetFromRecordContext() throws Exception {
+    public void shouldReturnOffsetFromRecordContext() {
         assertThat(context.offset(), equalTo(recordContext.offset()));
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnTimestampIfNoRecordContext() throws Exception {
+    public void shouldThrowIllegalStateExceptionOnTimestampIfNoRecordContext() {
         context.setRecordContext(null);
         try {
             context.timestamp();
@@ -135,7 +135,7 @@ public class AbstractProcessorContextTest {
     }
 
     @Test
-    public void shouldReturnTimestampFromRecordContext() throws Exception {
+    public void shouldReturnTimestampFromRecordContext() {
         assertThat(context.timestamp(), equalTo(recordContext.timestamp()));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 54726d8..d2d439c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -55,21 +55,21 @@ public class AbstractTaskTest {
     }
 
     @Test(expected = ProcessorStateException.class)
-    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() throws Exception {
+    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() {
         final Consumer consumer = mockConsumer(new AuthorizationException("blah"));
         final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList());
         task.updateOffsetLimits();
     }
 
     @Test(expected = ProcessorStateException.class)
-    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() throws Exception {
+    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() {
         final Consumer consumer = mockConsumer(new KafkaException("blah"));
         final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList());
         task.updateOffsetLimits();
     }
 
     @Test(expected = WakeupException.class)
-    public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() throws Exception {
+    public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() {
         final Consumer consumer = mockConsumer(new WakeupException());
         final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList());
         task.updateOffsetLimits();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index 77001ce..6966d67 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -47,14 +47,14 @@ public class CopartitionedTopicsValidatorTest {
     }
 
     @Test(expected = TopologyBuilderException.class)
-    public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() throws Exception {
+    public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() {
         validator.validate(Collections.singleton("topic"),
                            Collections.<String, StreamPartitionAssignor.InternalTopicMetadata>emptyMap(),
                            cluster);
     }
 
     @Test(expected = TopologyBuilderException.class)
-    public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() throws Exception {
+    public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() {
         partitions.remove(new TopicPartition("second", 0));
         validator.validate(Utils.mkSet("first", "second"),
                            Collections.<String, StreamPartitionAssignor.InternalTopicMetadata>emptyMap(),
@@ -63,7 +63,7 @@ public class CopartitionedTopicsValidatorTest {
 
 
     @Test
-    public void shouldEnforceCopartitioningOnRepartitionTopics() throws Exception {
+    public void shouldEnforceCopartitioningOnRepartitionTopics() {
         final StreamPartitionAssignor.InternalTopicMetadata metadata = createTopicMetadata("repartitioned", 10);
 
         validator.validate(Utils.mkSet("first", "second", metadata.config.name()),
@@ -76,7 +76,7 @@ public class CopartitionedTopicsValidatorTest {
 
 
     @Test
-    public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() throws Exception {
+    public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() {
         final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1);
         final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", 15);
         final StreamPartitionAssignor.InternalTopicMetadata three = createTopicMetadata("three", 5);
@@ -99,7 +99,7 @@ public class CopartitionedTopicsValidatorTest {
     }
 
     @Test
-    public void shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable() throws Exception {
+    public void shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable() {
         final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1);
         final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", StreamPartitionAssignor.NOT_AVAILABLE);
         final Map<String, StreamPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 98ef8f6..e530d60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -105,13 +105,13 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldLockGlobalStateDirectory() throws Exception {
+    public void shouldLockGlobalStateDirectory() {
         stateManager.initialize(context);
         assertTrue(new File(stateDirectory.globalStateDir(), ".lock").exists());
     }
 
     @Test(expected = LockException.class)
-    public void shouldThrowLockExceptionIfCantGetLock() throws Exception {
+    public void shouldThrowLockExceptionIfCantGetLock() throws IOException {
         final StateDirectory stateDir = new StateDirectory("appId", stateDirPath, time);
         try {
             stateDir.lockGlobalState(1);
@@ -122,7 +122,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldReadCheckpointOffsets() throws Exception {
+    public void shouldReadCheckpointOffsets() throws IOException {
         final Map<TopicPartition, Long> expected = writeCheckpoint();
 
         stateManager.initialize(context);
@@ -131,33 +131,33 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldNotDeleteCheckpointFileAfterLoaded() throws Exception {
+    public void shouldNotDeleteCheckpointFileAfterLoaded() throws IOException {
         writeCheckpoint();
         stateManager.initialize(context);
         assertTrue(checkpointFile.exists());
     }
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws Exception {
+    public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws IOException {
         writeCorruptCheckpoint();
         stateManager.initialize(context);
     }
 
     @Test
-    public void shouldInitializeStateStores() throws Exception {
+    public void shouldInitializeStateStores() {
         stateManager.initialize(context);
         assertTrue(store1.initialized);
         assertTrue(store2.initialized);
     }
 
     @Test
-    public void shouldReturnInitializedStoreNames() throws Exception {
+    public void shouldReturnInitializedStoreNames() {
         final Set<String> storeNames = stateManager.initialize(context);
         assertEquals(Utils.mkSet(store1.name(), store2.name()), storeNames);
     }
 
     @Test
-    public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() throws Exception {
+    public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() {
         stateManager.initialize(context);
 
         try {
@@ -169,7 +169,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() throws Exception {
+    public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
         stateManager.initialize(context);
         initializeConsumer(2, 1, t1);
         stateManager.register(store1, false, new TheStateRestoreCallback());
@@ -182,7 +182,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() throws Exception {
+    public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() {
         stateManager.initialize(context);
         try {
             stateManager.register(store1, false, new TheStateRestoreCallback());
@@ -193,7 +193,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldRestoreRecordsUpToHighwatermark() throws Exception {
+    public void shouldRestoreRecordsUpToHighwatermark() {
         initializeConsumer(2, 1, t1);
 
         stateManager.initialize(context);
@@ -204,7 +204,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws Exception {
+    public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws IOException {
         initializeConsumer(5, 6, t1);
 
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
@@ -219,7 +219,7 @@ public class GlobalStateManagerImplTest {
 
 
     @Test
-    public void shouldFlushStateStores() throws Exception {
+    public void shouldFlushStateStores() {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         // register the stores
@@ -234,7 +234,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test(expected = ProcessorStateException.class)
-    public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() throws Exception {
+    public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         // register the stores
@@ -250,7 +250,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldCloseStateStores() throws Exception {
+    public void shouldCloseStateStores() throws IOException {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         // register the stores
@@ -265,7 +265,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldWriteCheckpointsOnClose() throws Exception {
+    public void shouldWriteCheckpointsOnClose() throws IOException {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         initializeConsumer(1, 1, t1);
@@ -277,7 +277,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test(expected = ProcessorStateException.class)
-    public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws Exception {
+    public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException {
         stateManager.initialize(context);
         initializeConsumer(1, 1, t1);
         stateManager.register(new NoOpReadOnlyStore(store1.name()) {
@@ -291,7 +291,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() throws Exception {
+    public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
         stateManager.initialize(context);
         try {
             stateManager.register(store1, false, null);
@@ -302,7 +302,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldUnlockGlobalStateDirectoryOnClose() throws Exception {
+    public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
         stateManager.initialize(context);
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
         final StateDirectory stateDir = new StateDirectory("appId", stateDirPath, new MockTime());
@@ -315,7 +315,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldNotCloseStoresIfCloseAlreadyCalled() throws Exception {
+    public void shouldNotCloseStoresIfCloseAlreadyCalled() throws IOException {
         stateManager.initialize(context);
         initializeConsumer(1, 1, t1);
         stateManager.register(new NoOpReadOnlyStore("t1-store") {
@@ -334,7 +334,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws Exception {
+    public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws IOException {
         stateManager.initialize(context);
         initializeConsumer(1, 1, t1);
         initializeConsumer(1, 1, t2);
@@ -359,7 +359,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exception {
+    public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws IOException {
         writeCorruptCheckpoint();
         try {
             stateManager.initialize(context);
@@ -376,7 +376,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldCheckpointOffsets() throws Exception {
+    public void shouldCheckpointOffsets() throws IOException {
         final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L);
         stateManager.initialize(context);
 
@@ -388,7 +388,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() throws Exception {
+    public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() {
         stateManager.initialize(context);
         final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
         initializeConsumer(10, 1, t1);
@@ -405,7 +405,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldSkipNullKeysWhenRestoring() throws Exception {
+    public void shouldSkipNullKeysWhenRestoring() {
         final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
         startOffsets.put(t1, 1L);
         final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
@@ -446,7 +446,7 @@ public class GlobalStateManagerImplTest {
     }
 
     @Test
-    public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() throws Exception {
+    public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
         stateManager = new GlobalStateManagerImpl(topology, consumer, new StateDirectory("appId", stateDirPath, time) {
             @Override
             public boolean lockGlobalState(final int retry) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 7859a06..4ece443 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.test.NoOpProcessorContext;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -95,20 +96,20 @@ public class GlobalStateTaskTest {
     }
 
     @Test
-    public void shouldInitializeStateManager() throws Exception {
+    public void shouldInitializeStateManager() {
         final Map<TopicPartition, Long> startingOffsets = globalStateTask.initialize();
         assertTrue(stateMgr.initialized);
         assertEquals(offsets, startingOffsets);
     }
 
     @Test
-    public void shouldInitializeContext() throws Exception {
+    public void shouldInitializeContext() {
         globalStateTask.initialize();
         assertTrue(context.initialized);
     }
 
     @Test
-    public void shouldInitializeProcessorTopology() throws Exception {
+    public void shouldInitializeProcessorTopology() {
         globalStateTask.initialize();
         for (ProcessorNode processorNode : processorNodes) {
             if (processorNode instanceof  MockProcessorNode) {
@@ -120,7 +121,7 @@ public class GlobalStateTaskTest {
     }
 
     @Test
-    public void shouldProcessRecordsForTopic() throws Exception {
+    public void shouldProcessRecordsForTopic() {
         globalStateTask.initialize();
         globalStateTask.update(new ConsumerRecord<>("t1", 1, 1, "foo".getBytes(), "bar".getBytes()));
         assertEquals(1, sourceOne.numReceived);
@@ -128,7 +129,7 @@ public class GlobalStateTaskTest {
     }
 
     @Test
-    public void shouldProcessRecordsForOtherTopic() throws Exception {
+    public void shouldProcessRecordsForOtherTopic() {
         final byte[] integerBytes = new IntegerSerializer().serialize("foo", 1);
         globalStateTask.initialize();
         globalStateTask.update(new ConsumerRecord<>("t2", 1, 1, integerBytes, integerBytes));
@@ -194,7 +195,7 @@ public class GlobalStateTaskTest {
 
 
     @Test
-    public void shouldCloseStateManagerWithOffsets() throws Exception {
+    public void shouldCloseStateManagerWithOffsets() throws IOException {
         final Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
         expectedOffsets.put(t1, 52L);
         expectedOffsets.put(t2, 100L);
@@ -206,7 +207,7 @@ public class GlobalStateTaskTest {
     }
 
     @Test
-    public void shouldCheckpointOffsetsWhenStateIsFlushed() throws Exception {
+    public void shouldCheckpointOffsetsWhenStateIsFlushed() {
         final Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
         expectedOffsets.put(t1, 102L);
         expectedOffsets.put(t2, 100L);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 9e1526b..9d0b637 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -69,7 +69,7 @@ public class GlobalStreamThreadTest {
     }
 
     @Test
-    public void shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() throws Exception {
+    public void shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() {
         // should throw as the MockConsumer hasn't been configured and there are no
         // partitions available
         try {
@@ -83,7 +83,7 @@ public class GlobalStreamThreadTest {
 
     @SuppressWarnings("unchecked")
     @Test
-    public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception {
+    public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() {
         final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
             @Override
             public List<PartitionInfo> partitionsFor(final String topic) {
@@ -110,14 +110,14 @@ public class GlobalStreamThreadTest {
 
 
     @Test
-    public void shouldBeRunningAfterSuccessfulStart() throws Exception {
+    public void shouldBeRunningAfterSuccessfulStart() {
         initializeConsumer();
         globalStreamThread.start();
         assertTrue(globalStreamThread.stillRunning());
     }
 
     @Test(timeout = 30000)
-    public void shouldStopRunningWhenClosedByUser() throws Exception {
+    public void shouldStopRunningWhenClosedByUser() throws InterruptedException {
         initializeConsumer();
         globalStreamThread.start();
         globalStreamThread.shutdown();
@@ -126,7 +126,7 @@ public class GlobalStreamThreadTest {
     }
 
     @Test
-    public void shouldCloseStateStoresOnClose() throws Exception {
+    public void shouldCloseStateStoresOnClose() throws InterruptedException {
         initializeConsumer();
         globalStreamThread.start();
         final StateStore globalStore = builder.globalStateStores().get("bar");

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
index 55534b7..044e82a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
@@ -33,7 +33,7 @@ import static org.junit.Assert.assertTrue;
 public class InternalTopicConfigTest {
 
     @Test
-    public void shouldHaveCompactionPropSetIfSupplied() throws Exception {
+    public void shouldHaveCompactionPropSetIfSupplied() {
         final Properties properties = new InternalTopicConfig("name",
                                                               Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
                                                               Collections.<String, String>emptyMap()).toProperties(0);
@@ -42,17 +42,17 @@ public class InternalTopicConfigTest {
 
 
     @Test(expected = NullPointerException.class)
-    public void shouldThrowIfNameIsNull() throws Exception {
+    public void shouldThrowIfNameIsNull() {
         new InternalTopicConfig(null, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.<String, String>emptyMap());
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void shouldThrowIfNameIsInvalid() throws Exception {
+    public void shouldThrowIfNameIsInvalid() {
         new InternalTopicConfig("foo bar baz", Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.<String, String>emptyMap());
     }
 
     @Test
-    public void shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete() throws Exception {
+    public void shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete() {
         final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
                                                                         Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact, InternalTopicConfig.CleanupPolicy.delete),
                                                                         Collections.<String, String>emptyMap());
@@ -63,7 +63,7 @@ public class InternalTopicConfigTest {
     }
 
     @Test
-    public void shouldNotConfigureRetentionMsWhenCompact() throws Exception {
+    public void shouldNotConfigureRetentionMsWhenCompact() {
         final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
                                                                         Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
                                                                         Collections.<String, String>emptyMap());
@@ -73,7 +73,7 @@ public class InternalTopicConfigTest {
     }
 
     @Test
-    public void shouldNotConfigureRetentionMsWhenDelete() throws Exception {
+    public void shouldNotConfigureRetentionMsWhenDelete() {
         final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
                                                                         Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
                                                                         Collections.<String, String>emptyMap());
@@ -84,7 +84,7 @@ public class InternalTopicConfigTest {
 
 
     @Test
-    public void shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete() throws Exception {
+    public void shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete() {
         assertTrue(new InternalTopicConfig("name",
                                            Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
                                            Collections.<String, String>emptyMap()).isCompacted());
@@ -94,14 +94,14 @@ public class InternalTopicConfigTest {
     }
 
     @Test
-    public void shouldNotBeCompactedWhenCleanupPolicyIsDelete() throws Exception {
+    public void shouldNotBeCompactedWhenCleanupPolicyIsDelete() {
         assertFalse(new InternalTopicConfig("name",
                                             Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
                                             Collections.<String, String>emptyMap()).isCompacted());
     }
 
     @Test
-    public void shouldUseCleanupPolicyFromConfigIfSupplied() throws Exception {
+    public void shouldUseCleanupPolicyFromConfigIfSupplied() {
         final InternalTopicConfig config = new InternalTopicConfig("name",
                                                                    Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
                                                                    Collections.singletonMap("cleanup.policy", "compact"));
@@ -111,7 +111,7 @@ public class InternalTopicConfigTest {
     }
 
     @Test
-    public void shouldHavePropertiesSuppliedByUser() throws Exception {
+    public void shouldHavePropertiesSuppliedByUser() {
         final Map<String, String> configs = new HashMap<>();
         configs.put("retention.ms", "1000");
         configs.put("retention.bytes", "10000");

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 469f9cb..1cd6bee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -56,21 +56,21 @@ public class InternalTopicManagerTest {
     }
 
     @Test
-    public void shouldReturnCorrectPartitionCounts() throws Exception {
+    public void shouldReturnCorrectPartitionCounts() {
         InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
             WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
         Assert.assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
     }
 
     @Test
-    public void shouldCreateRequiredTopics() throws Exception {
+    public void shouldCreateRequiredTopics() {
         InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
             WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
         internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
     }
 
     @Test
-    public void shouldNotCreateTopicIfExistsWithDifferentPartitions() throws Exception {
+    public void shouldNotCreateTopicIfExistsWithDifferentPartitions() {
         InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
             WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
         boolean exceptionWasThrown = false;
@@ -83,7 +83,7 @@ public class InternalTopicManagerTest {
     }
 
     @Test
-    public void shouldNotThrowExceptionIfExistsWithDifferentReplication() throws Exception {
+    public void shouldNotThrowExceptionIfExistsWithDifferentReplication() {
 
         // create topic the first time with replication 2
         InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 2,
@@ -101,7 +101,7 @@ public class InternalTopicManagerTest {
     }
 
     @Test
-    public void shouldNotThrowExceptionForEmptyTopicMap() throws Exception {
+    public void shouldNotThrowExceptionForEmptyTopicMap() {
         InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
             WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
         internalTopicManager.makeReady(Collections.EMPTY_MAP);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
index f6a1518..24653e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
@@ -26,18 +26,18 @@ public class MinTimestampTrackerTest {
     private MinTimestampTracker<String> tracker = new MinTimestampTracker<>();
 
     @Test
-    public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() throws Exception {
+    public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() {
         assertThat(tracker.get(), equalTo(TimestampTracker.NOT_KNOWN));
     }
 
     @Test
-    public void shouldReturnTimestampOfOnlyRecord() throws Exception {
+    public void shouldReturnTimestampOfOnlyRecord() {
         tracker.addElement(elem(100));
         assertThat(tracker.get(), equalTo(100L));
     }
 
     @Test
-    public void shouldReturnLowestAvailableTimestampFromAllInputs() throws Exception {
+    public void shouldReturnLowestAvailableTimestampFromAllInputs() {
         tracker.addElement(elem(100));
         tracker.addElement(elem(99));
         tracker.addElement(elem(102));
@@ -45,7 +45,7 @@ public class MinTimestampTrackerTest {
     }
 
     @Test
-    public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() throws Exception {
+    public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() {
         final Stamped<String> lowest = elem(88);
         tracker.addElement(lowest);
         tracker.addElement(elem(101));
@@ -55,7 +55,7 @@ public class MinTimestampTrackerTest {
     }
 
     @Test
-    public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() throws Exception {
+    public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() {
         final Stamped<String> record = elem(98);
         tracker.addElement(record);
         tracker.removeElement(record);
@@ -63,12 +63,12 @@ public class MinTimestampTrackerTest {
     }
 
     @Test
-    public void shouldIgnoreNullRecordOnRemove() throws Exception {
+    public void shouldIgnoreNullRecordOnRemove() {
         tracker.removeElement(null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() throws Exception {
+    public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() {
         tracker.addElement(null);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index f37a674..c56f609 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -35,14 +35,14 @@ public class ProcessorNodeTest {
 
     @SuppressWarnings("unchecked")
     @Test (expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() throws Exception {
+    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
         final ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
         node.init(null);
     }
 
     @SuppressWarnings("unchecked")
     @Test (expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() throws Exception {
+    public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
         final ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
         node.close();
     }