You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/11 08:42:22 UTC

[5/5] kafka git commit: KAFKA-5531; throw concrete exceptions in streams tests

KAFKA-5531; throw concrete exceptions in streams tests

1. Now instead of just generic `Exception` methods declare more concrete
exceptions throwing or don't declare any throwing at all, if not needed.
2. `SimpleBenchmark.run()` throws `RuntimeException`
3. `SimpleBenchmark.produce()` throws `IllegalArgumentException`
4. Expect `ProcessorStateException` in
`StandbyTaskTest.testUpdateNonPersistentStore()`

/cc enothereska

Author: Evgeny Veretennikov <ev...@gmail.com>

Reviewers: Damian Guy <da...@gmail.com>

Closes #3485 from evis/5531-throw-concrete-exceptions


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c5464edb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c5464edb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c5464edb

Branch: refs/heads/trunk
Commit: c5464edbb7a6821e0a91a3712b1fe2fd92a22d68
Parents: 3728f4c
Author: Evgeny Veretennikov <ev...@gmail.com>
Authored: Mon Sep 11 09:42:10 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Mon Sep 11 09:42:10 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/streams/KafkaStreamsTest.java  | 24 ++---
 .../apache/kafka/streams/StreamsConfigTest.java | 36 ++++----
 .../streams/integration/EosIntegrationTest.java |  4 +-
 .../integration/FanoutIntegrationTest.java      |  2 +-
 .../GlobalKTableIntegrationTest.java            |  6 +-
 .../InternalTopicIntegrationTest.java           |  4 +-
 .../integration/JoinIntegrationTest.java        |  8 +-
 .../KStreamAggregationDedupIntegrationTest.java |  4 +-
 .../KStreamAggregationIntegrationTest.java      |  4 +-
 .../KStreamKTableJoinIntegrationTest.java       |  2 +-
 .../integration/KStreamRepartitionJoinTest.java | 24 ++---
 ...eamsFineGrainedAutoResetIntegrationTest.java |  7 +-
 .../KTableKTableJoinIntegrationTest.java        | 43 ++++-----
 .../QueryableStateIntegrationTest.java          | 12 +--
 .../integration/RegexSourceIntegrationTest.java |  7 +-
 .../integration/ResetIntegrationTest.java       |  3 +-
 .../integration/utils/EmbeddedKafkaCluster.java | 16 ++--
 .../streams/kstream/KStreamBuilderTest.java     | 32 +++----
 .../internals/GlobalKTableJoinsTest.java        |  2 +-
 .../internals/KGroupedStreamImplTest.java       | 96 ++++++++++----------
 .../internals/KGroupedTableImplTest.java        | 26 +++---
 .../kstream/internals/KStreamImplTest.java      | 58 ++++++------
 .../internals/KStreamKStreamJoinTest.java       | 10 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |  4 +-
 .../internals/KStreamKTableJoinTest.java        |  2 +-
 .../internals/KStreamKTableLeftJoinTest.java    |  2 +-
 ...reamSessionWindowAggregateProcessorTest.java | 18 ++--
 .../internals/KStreamWindowAggregateTest.java   |  6 +-
 .../internals/KTableKTableLeftJoinTest.java     |  8 +-
 .../internals/KTableKTableOuterJoinTest.java    |  6 +-
 .../kstream/internals/SessionKeySerdeTest.java  | 22 ++---
 .../kafka/streams/perf/SimpleBenchmark.java     | 36 ++++----
 .../kafka/streams/perf/YahooBenchmark.java      |  6 +-
 .../streams/processor/TopologyBuilderTest.java  | 43 ++++-----
 .../internals/AbstractProcessorContextTest.java | 22 ++---
 .../processor/internals/AbstractTaskTest.java   |  6 +-
 .../CopartitionedTopicsValidatorTest.java       | 10 +-
 .../internals/GlobalStateManagerImplTest.java   | 52 +++++------
 .../internals/GlobalStateTaskTest.java          | 15 +--
 .../internals/GlobalStreamThreadTest.java       | 10 +-
 .../internals/InternalTopicConfigTest.java      | 20 ++--
 .../internals/InternalTopicManagerTest.java     | 10 +-
 .../internals/MinTimestampTrackerTest.java      | 14 +--
 .../processor/internals/ProcessorNodeTest.java  |  4 +-
 .../internals/ProcessorStateManagerTest.java    | 20 ++--
 .../internals/ProcessorTopologyTest.java        | 10 +-
 .../internals/RecordCollectorTest.java          | 10 +-
 .../processor/internals/RecordQueueTest.java    |  4 +-
 .../SourceNodeRecordDeserializerTest.java       |  6 +-
 .../processor/internals/StandbyTaskTest.java    | 16 ++--
 .../processor/internals/StateConsumerTest.java  | 21 +++--
 .../processor/internals/StateDirectoryTest.java | 30 +++---
 .../processor/internals/StateRestorerTest.java  | 16 ++--
 .../internals/StoreChangelogReaderTest.java     | 24 ++---
 .../internals/StreamPartitionAssignorTest.java  |  8 +-
 .../processor/internals/StreamTaskTest.java     | 68 +++++++-------
 .../processor/internals/StreamThreadTest.java   | 12 +--
 .../internals/StreamsMetadataStateTest.java     | 40 ++++----
 .../internals/StreamsMetricsImplTest.java       |  2 +-
 .../assignment/AssignmentInfoTest.java          |  2 +-
 .../internals/assignment/ClientStateTest.java   | 34 +++----
 .../assignment/StickyTaskAssignorTest.java      | 56 ++++++------
 .../assignment/SubscriptionInfoTest.java        |  4 +-
 .../apache/kafka/streams/state/StoresTest.java  | 10 +-
 .../internals/AbstractKeyValueStoreTest.java    | 20 ++--
 .../internals/CachingKeyValueStoreTest.java     | 38 ++++----
 .../internals/CachingSessionStoreTest.java      | 48 +++++-----
 .../state/internals/CachingWindowStoreTest.java | 44 ++++-----
 .../ChangeLoggingKeyValueBytesStoreTest.java    | 30 +++---
 .../ChangeLoggingKeyValueStoreTest.java         | 38 ++++----
 .../CompositeReadOnlyKeyValueStoreTest.java     | 32 +++----
 .../CompositeReadOnlySessionStoreTest.java      | 10 +-
 .../CompositeReadOnlyWindowStoreTest.java       | 18 ++--
 .../DelegatingPeekingKeyValueIteratorTest.java  | 12 +--
 .../internals/FilteredCacheIteratorTest.java    | 12 +--
 .../internals/GlobalStateStoreProviderTest.java |  6 +-
 ...tedCacheWrappedSessionStoreIteratorTest.java | 14 +--
 ...rtedCacheWrappedWindowStoreIteratorTest.java |  6 +-
 ...eWrappedWindowStoreKeyValueIteratorTest.java | 14 +--
 .../streams/state/internals/NamedCacheTest.java | 36 ++++----
 .../internals/QueryableStoreProviderTest.java   | 14 +--
 .../RocksDBKeyValueStoreSupplierTest.java       | 14 +--
 .../internals/RocksDBKeyValueStoreTest.java     |  8 +-
 .../RocksDBSegmentedBytesStoreTest.java         |  8 +-
 .../RocksDBSessionStoreSupplierTest.java        | 14 +--
 .../internals/RocksDBSessionStoreTest.java      | 28 +++---
 .../state/internals/RocksDBStoreTest.java       |  6 +-
 .../RocksDBWindowStoreSupplierTest.java         | 14 +--
 .../state/internals/RocksDBWindowStoreTest.java | 18 ++--
 .../state/internals/SegmentIteratorTest.java    |  8 +-
 .../internals/SegmentedCacheFunctionTest.java   |  8 +-
 .../streams/state/internals/SegmentsTest.java   | 22 ++---
 .../SerializedKeyValueIteratorTest.java         | 10 +-
 .../state/internals/SessionKeySchemaTest.java   | 14 +--
 .../state/internals/StoreChangeLoggerTest.java  |  2 +-
 .../StreamThreadStateStoreProviderTest.java     | 14 +--
 .../state/internals/ThreadCacheTest.java        | 46 +++++-----
 .../state/internals/WindowKeySchemaTest.java    | 14 +--
 .../state/internals/WindowStoreUtilsTest.java   |  2 +-
 .../internals/WrappingStoreProviderTest.java    |  6 +-
 .../streams/tests/BrokerCompatibilityTest.java  |  2 +-
 .../kafka/streams/tests/EosTestDriver.java      |  2 +-
 .../kafka/streams/tests/SmokeTestDriver.java    |  4 +-
 .../kafka/streams/tests/SmokeTestUtil.java      |  2 +-
 .../kafka/streams/tests/StreamsEosTest.java     |  2 +-
 .../kafka/streams/tests/StreamsSmokeTest.java   |  2 +-
 106 files changed, 900 insertions(+), 901 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 5e5f6c9..994a3a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -83,7 +83,7 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testStateChanges() throws Exception {
+    public void testStateChanges() throws InterruptedException {
         final StreamsBuilder builder = new StreamsBuilder();
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
@@ -104,7 +104,7 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testStateCloseAfterCreate() throws Exception {
+    public void testStateCloseAfterCreate() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
@@ -208,7 +208,7 @@ public class KafkaStreamsTest {
 
 
     @Test
-    public void testInitializesAndDestroysMetricsReporters() throws Exception {
+    public void testInitializesAndDestroysMetricsReporters() {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final StreamsBuilder builder = new StreamsBuilder();
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);
@@ -223,7 +223,7 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testCloseIsIdempotent() throws Exception {
+    public void testCloseIsIdempotent() {
         streams.close();
         final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
@@ -233,7 +233,7 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testCannotStartOnceClosed() throws Exception {
+    public void testCannotStartOnceClosed() {
         streams.start();
         streams.close();
         try {
@@ -247,7 +247,7 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testCannotStartTwice() throws Exception {
+    public void testCannotStartTwice() {
         streams.start();
 
         try {
@@ -336,22 +336,22 @@ public class KafkaStreamsTest {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
+    public void shouldNotGetAllTasksWhenNotRunning() {
         streams.allMetadata();
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
+    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
         streams.allMetadataForStore("store");
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
+    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
         streams.metadataForKey("store", "key", Serdes.String().serializer());
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
+    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
         streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
             @Override
             public Integer partition(final String key, final Object value, final int numPartitions) {
@@ -438,7 +438,7 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testCleanup() throws Exception {
+    public void testCleanup() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -453,7 +453,7 @@ public class KafkaStreamsTest {
     }
 
     @Test
-    public void testCannotCleanupWhileRunning() throws Exception {
+    public void testCannotCleanupWhileRunning() throws InterruptedException {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 69a44a6..3774a8e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -82,7 +82,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void testGetProducerConfigs() throws Exception {
+    public void testGetProducerConfigs() {
         final String clientId = "client";
         final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
         assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer");
@@ -91,7 +91,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void testGetConsumerConfigs() throws Exception {
+    public void testGetConsumerConfigs() {
         final String groupId = "example-application";
         final String clientId = "client";
         final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, groupId, clientId);
@@ -102,7 +102,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void testGetRestoreConsumerConfigs() throws Exception {
+    public void testGetRestoreConsumerConfigs() {
         final String clientId = "client";
         final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer");
@@ -143,7 +143,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+    public void shouldSupportPrefixedConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -153,7 +153,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
+    public void shouldSupportPrefixedRestoreConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -163,7 +163,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
@@ -171,7 +171,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
@@ -179,7 +179,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(producerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
@@ -188,7 +188,7 @@ public class StreamsConfigTest {
 
 
     @Test
-    public void shouldSupportPrefixedProducerConfigs() throws Exception {
+    public void shouldSupportPrefixedProducerConfigs() {
         props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
         props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -198,7 +198,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+    public void shouldBeSupportNonPrefixedConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -208,7 +208,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception {
+    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -218,7 +218,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+    public void shouldSupportNonPrefixedProducerConfigs() {
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -230,21 +230,21 @@ public class StreamsConfigTest {
 
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception {
+    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.defaultKeySerde();
     }
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception {
+    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.defaultValueSerde();
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+    public void shouldOverrideStreamsDefaultConsumerConfigs() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -254,7 +254,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+    public void shouldOverrideStreamsDefaultProducerConfigs() {
         props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
@@ -262,7 +262,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -288,7 +288,7 @@ public class StreamsConfigTest {
     }
 
     @Test
-    public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception {
+    public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
         assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index fb93783..e50f4d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -97,7 +97,7 @@ public class EosIntegrationTest {
     private int testNumber = 0;
 
     @Before
-    public void createTopics() throws Exception {
+    public void createTopics() throws InterruptedException {
         applicationId = "appId-" + ++testNumber;
         CLUSTER.deleteTopicsAndWait(
             SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
@@ -701,7 +701,7 @@ public class EosIntegrationTest {
     }
 
     private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
-                                                  final String groupId) throws Exception {
+                                                  final String groupId) throws InterruptedException {
         if (groupId != null) {
             return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
                 TestUtils.consumerConfig(

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 733ca0b..b4a9320 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -78,7 +78,7 @@ public class FanoutIntegrationTest {
     private static final String OUTPUT_TOPIC_C = "C";
 
     @BeforeClass
-    public static void startKafkaCluster() throws Exception {
+    public static void startKafkaCluster() throws InterruptedException {
         CLUSTER.createTopics(INPUT_TOPIC_A, OUTPUT_TOPIC_B, OUTPUT_TOPIC_C);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 7aef5c9..cbf2b56 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -232,7 +232,7 @@ public class GlobalKTableIntegrationTest {
         kafkaStreams.start();
     }
 
-    private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void produceTopicValues(final String topic) throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronously(
                 topic,
                 Arrays.asList(
@@ -249,7 +249,7 @@ public class GlobalKTableIntegrationTest {
                 mockTime);
     }
 
-    private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void produceInitialGlobalTableValues() throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronously(
                 globalOne,
                 Arrays.asList(
@@ -265,7 +265,7 @@ public class GlobalKTableIntegrationTest {
                 mockTime);
     }
 
-    private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void produceGlobalTableValues() throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronously(
                 globalOne,
                 Arrays.asList(

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 587f478..01cfa5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -76,7 +76,7 @@ public class InternalTopicIntegrationTest {
     private String applicationId = "compact-topics-integration-test";
 
     @BeforeClass
-    public static void startKafkaCluster() throws Exception {
+    public static void startKafkaCluster() throws InterruptedException {
         CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_OUTPUT_TOPIC);
     }
 
@@ -173,7 +173,7 @@ public class InternalTopicIntegrationTest {
         assertEquals(LogConfig.Compact(), properties.getProperty(LogConfig.CleanupPolicyProp()));
     }
 
-    private void produceData(final List<String> inputValues) throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void produceData(final List<String> inputValues) throws Exception {
         final Properties producerConfig = new Properties();
         producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 490c928..3a771c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -100,7 +100,7 @@ public class JoinIntegrationTest {
     };
 
     @BeforeClass
-    public static void setupConfigsAndUtils() throws Exception {
+    public static void setupConfigsAndUtils() {
         PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
         PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -124,7 +124,7 @@ public class JoinIntegrationTest {
     }
 
     @Before
-    public void prepareTopology() throws Exception {
+    public void prepareTopology() throws InterruptedException {
         CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
 
         builder = new StreamsBuilder();
@@ -135,11 +135,11 @@ public class JoinIntegrationTest {
     }
 
     @After
-    public void cleanup() throws Exception {
+    public void cleanup() throws InterruptedException {
         CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
     }
 
-    private void checkResult(final String outputTopic, final List<String> expectedResult) throws Exception {
+    private void checkResult(final String outputTopic, final List<String> expectedResult) throws InterruptedException {
         if (expectedResult != null) {
             final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L);
             assertThat(result, is(expectedResult));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 5740779..cb58849 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -53,7 +53,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -260,8 +259,7 @@ public class KStreamAggregationDedupIntegrationTest {
     }
 
 
-    private void produceMessages(long timestamp)
-        throws ExecutionException, InterruptedException {
+    private void produceMessages(long timestamp) throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             streamOneInput,
             Arrays.asList(

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index e9927bc..7ff24da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -66,7 +66,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -645,8 +644,7 @@ public class KStreamAggregationIntegrationTest {
     }
 
 
-    private void produceMessages(final long timestamp)
-        throws ExecutionException, InterruptedException {
+    private void produceMessages(final long timestamp) throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             streamOneInput,
             Arrays.asList(

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 8dc22ce..a433667 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -144,7 +144,7 @@ public class KStreamKTableJoinIntegrationTest {
         countClicksPerRegion(10 * 1024 * 1024);
     }
 
-    private void countClicksPerRegion(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void countClicksPerRegion(final int cacheSizeBytes) throws Exception {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         // Input 1: Clicks per user (multiple records allowed per user).
         final List<KeyValue<String, Long>> userClicks = Arrays.asList(

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 9618033..5f6ff44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -53,7 +53,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -156,7 +155,7 @@ public class KStreamRepartitionJoinTest {
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput);
     }
 
-    private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws Exception {
+    private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws InterruptedException {
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
         final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
 
@@ -164,7 +163,7 @@ public class KStreamRepartitionJoinTest {
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join-" + testNo);
     }
 
-    private ExpectedOutputOnTopic mapMapJoin() throws Exception {
+    private ExpectedOutputOnTopic mapMapJoin() throws InterruptedException {
         final KStream<Integer, Integer> mapMapStream = streamOne.map(
             new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() {
                 @Override
@@ -181,7 +180,7 @@ public class KStreamRepartitionJoinTest {
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
-    private ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException {
+    private ExpectedOutputOnTopic selectKeyAndJoin() throws Exception {
 
         final KStream<Integer, Integer> keySelected =
             streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper());
@@ -191,7 +190,7 @@ public class KStreamRepartitionJoinTest {
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
-    private ExpectedOutputOnTopic flatMapJoin() throws Exception {
+    private ExpectedOutputOnTopic flatMapJoin() throws InterruptedException {
         final KStream<Integer, Integer> flatMapped = streamOne.flatMap(
             new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>() {
                 @Override
@@ -206,7 +205,7 @@ public class KStreamRepartitionJoinTest {
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
-    private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception {
+    private ExpectedOutputOnTopic joinMappedRhsStream() throws InterruptedException {
 
         final String output = "join-rhs-stream-mapped-" + testNo;
         CLUSTER.createTopic(output);
@@ -220,7 +219,7 @@ public class KStreamRepartitionJoinTest {
         return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output);
     }
 
-    private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception {
+    private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws InterruptedException {
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
 
         final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
@@ -244,7 +243,7 @@ public class KStreamRepartitionJoinTest {
         return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
     }
 
-    private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception {
+    private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws InterruptedException {
         final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
 
         final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
@@ -293,16 +292,14 @@ public class KStreamRepartitionJoinTest {
             is(expectedOutputOnTopic.expectedOutput));
     }
 
-    private void produceMessages()
-        throws ExecutionException, InterruptedException {
+    private void produceMessages() throws Exception {
         produceToStreamOne();
         produceStreamTwoInputTo(streamTwoInput);
         produceStreamTwoInputTo(streamFourInput);
 
     }
 
-    private void produceStreamTwoInputTo(final String streamTwoInput)
-        throws ExecutionException, InterruptedException {
+    private void produceStreamTwoInputTo(final String streamTwoInput) throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamTwoInput,
             Arrays.asList(
@@ -319,8 +316,7 @@ public class KStreamRepartitionJoinTest {
             mockTime);
     }
 
-    private void produceToStreamOne()
-        throws ExecutionException, InterruptedException {
+    private void produceToStreamOne() throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronously(
             streamOneInput,
             Arrays.asList(

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index 92f351b..5415b58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -47,6 +47,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -107,7 +108,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
 
 
     @BeforeClass
-    public static void startKafkaCluster() throws Exception {
+    public static void startKafkaCluster() throws InterruptedException {
         CLUSTER.createTopics(
             TOPIC_1_0,
             TOPIC_2_0,
@@ -135,7 +136,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
     }
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() throws IOException {
 
         Properties props = new Properties();
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -274,7 +275,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
     }
 
     @Test
-    public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception {
+    public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException {
         Properties props = new Properties();
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 9df1ef5..949f8be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -42,6 +42,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -118,12 +119,12 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Before
-    public void before() throws Exception {
+    public void before() throws IOException {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
     }
 
     @After
-    public void after() throws Exception {
+    public void after() throws IOException {
         if (streams != null) {
             streams.close();
             streams = null;
@@ -137,27 +138,27 @@ public class KTableKTableJoinIntegrationTest {
 
 
     @Test
-    public void shouldInnerInnerJoin() throws Exception {
+    public void shouldInnerInnerJoin() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
     }
 
     @Test
-    public void shouldInnerInnerJoinQueryable() throws Exception {
+    public void shouldInnerInnerJoinQueryable() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
     }
 
     @Test
-    public void shouldInnerLeftJoin() throws Exception {
+    public void shouldInnerLeftJoin() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
     }
 
     @Test
-    public void shouldInnerLeftJoinQueryable() throws Exception {
+    public void shouldInnerLeftJoinQueryable() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
     }
 
     @Test
-    public void shouldInnerOuterJoin() throws Exception {
+    public void shouldInnerOuterJoin() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
                 new KeyValue<>("a", "null-A3"),
                 new KeyValue<>("b", "null-B3"),
@@ -166,7 +167,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldInnerOuterJoinQueryable() throws Exception {
+    public void shouldInnerOuterJoinQueryable() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
             new KeyValue<>("a", "null-A3"),
             new KeyValue<>("b", "null-B3"),
@@ -175,7 +176,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldLeftInnerJoin() throws Exception {
+    public void shouldLeftInnerJoin() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
@@ -183,7 +184,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldLeftInnerJoinQueryable() throws Exception {
+    public void shouldLeftInnerJoinQueryable() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
             new KeyValue<>("a", "A1-null-A3"),
             new KeyValue<>("b", "B1-null-B3"),
@@ -191,7 +192,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldLeftLeftJoin() throws Exception {
+    public void shouldLeftLeftJoin() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
@@ -199,7 +200,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldLeftLeftJoinQueryable() throws Exception {
+    public void shouldLeftLeftJoinQueryable() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
             new KeyValue<>("a", "A1-null-A3"),
             new KeyValue<>("b", "B1-null-B3"),
@@ -207,7 +208,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldLeftOuterJoin() throws Exception {
+    public void shouldLeftOuterJoin() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
                 new KeyValue<>("a", "null-A3"),
                 new KeyValue<>("b", "null-B3"),
@@ -218,7 +219,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldLeftOuterJoinQueryable() throws Exception {
+    public void shouldLeftOuterJoinQueryable() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
             new KeyValue<>("a", "null-A3"),
             new KeyValue<>("b", "null-B3"),
@@ -229,7 +230,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldOuterInnerJoin() throws Exception {
+    public void shouldOuterInnerJoin() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
@@ -238,7 +239,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldOuterInnerJoinQueryable() throws Exception {
+    public void shouldOuterInnerJoinQueryable() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
             new KeyValue<>("a", "A1-null-A3"),
             new KeyValue<>("b", "B1-null-B3"),
@@ -247,7 +248,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldOuterLeftJoin() throws Exception {
+    public void shouldOuterLeftJoin() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT,  Arrays.asList(
                 new KeyValue<>("a", "A1-null-A3"),
                 new KeyValue<>("b", "B1-null-B3"),
@@ -256,7 +257,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldOuterLeftJoinQueryable() throws Exception {
+    public void shouldOuterLeftJoinQueryable() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT,  Arrays.asList(
             new KeyValue<>("a", "A1-null-A3"),
             new KeyValue<>("b", "B1-null-B3"),
@@ -265,7 +266,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldOuterOuterJoin() throws Exception {
+    public void shouldOuterOuterJoin() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
                 new KeyValue<>("a", "null-A3"),
                 new KeyValue<>("b", "null-B3"),
@@ -277,7 +278,7 @@ public class KTableKTableJoinIntegrationTest {
     }
 
     @Test
-    public void shouldOuterOuterJoinQueryable() throws Exception {
+    public void shouldOuterOuterJoinQueryable() throws InterruptedException {
         verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
             new KeyValue<>("a", "null-A3"),
             new KeyValue<>("b", "null-B3"),
@@ -292,7 +293,7 @@ public class KTableKTableJoinIntegrationTest {
     private void verifyKTableKTableJoin(final JoinType joinType1,
                                         final JoinType joinType2,
                                         final List<KeyValue<String, String>> expectedResult,
-                                        boolean verifyQueryableState) throws Exception {
+                                        boolean verifyQueryableState) throws InterruptedException {
         final String queryableName = verifyQueryableState ? joinType1 + "-" + joinType2 + "-ktable-ktable-join-query" : null;
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join" + queryableName);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index dc59fb4..9c8244a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -127,7 +127,7 @@ public class QueryableStateIntegrationTest {
     }
 
     @Before
-    public void before() throws IOException, InterruptedException {
+    public void before() throws Exception {
         testNo++;
         createTopics();
         streamsConfiguration = new Properties();
@@ -255,7 +255,7 @@ public class QueryableStateIntegrationTest {
 
     private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
                                  final KafkaStreamsTest.StateListenerStub stateListenerStub,
-                                 final Set<String> keys, final String storeName) throws Exception {
+                                 final Set<String> keys, final String storeName) throws InterruptedException {
         for (final String key : keys) {
             TestUtils.waitForCondition(new TestCondition() {
                 @Override
@@ -287,7 +287,7 @@ public class QueryableStateIntegrationTest {
     private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
                                        final KafkaStreamsTest.StateListenerStub stateListenerStub,
                                        final Set<String> keys, final String storeName,
-                                       final Long from, final Long to) throws Exception {
+                                       final Long from, final Long to) throws InterruptedException {
         for (final String key : keys) {
             TestUtils.waitForCondition(new TestCondition() {
                 @Override
@@ -317,7 +317,7 @@ public class QueryableStateIntegrationTest {
 
 
     @Test
-    public void queryOnRebalance() throws Exception {
+    public void queryOnRebalance() throws InterruptedException {
         final int numThreads = STREAM_TWO_PARTITIONS;
         final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
         final Thread[] streamThreads = new Thread[numThreads];
@@ -369,7 +369,7 @@ public class QueryableStateIntegrationTest {
     }
 
     @Test
-    public void concurrentAccesses() throws Exception {
+    public void concurrentAccesses() throws InterruptedException {
 
         final int numIterations = 500000;
 
@@ -590,7 +590,7 @@ public class QueryableStateIntegrationTest {
         }
     }
 
-    private void verifyCanQueryState(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
+    private void verifyCanQueryState(final int cacheSizeBytes) throws Exception {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
         final StreamsBuilder builder = new StreamsBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 13d9f82..1da4c58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -92,7 +93,7 @@ public class RegexSourceIntegrationTest {
 
 
     @BeforeClass
-    public static void startKafkaCluster() throws Exception {
+    public static void startKafkaCluster() throws InterruptedException {
         CLUSTER.createTopics(
             TOPIC_1,
             TOPIC_2,
@@ -119,7 +120,7 @@ public class RegexSourceIntegrationTest {
     }
 
     @After
-    public void tearDown() throws Exception {
+    public void tearDown() throws IOException {
         if (streams != null) {
             streams.close();
         }
@@ -229,7 +230,7 @@ public class RegexSourceIntegrationTest {
 
     @SuppressWarnings("deprecation")
     @Test
-    public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
+    public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
 
         final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         final MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 99a524e..897028d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -50,6 +50,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -242,7 +243,7 @@ public class ResetIntegrationTest {
         cleanGlobal(null);
     }
 
-    private Properties prepareTest(final int threads) throws Exception {
+    private Properties prepareTest(final int threads) throws IOException {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index e738bc6..6a873d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -215,7 +215,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      *
      * @param topic the name of the topic
      */
-    public void deleteTopic(final String topic) throws Exception {
+    public void deleteTopic(final String topic) throws InterruptedException {
         deleteTopicsAndWait(-1L, topic);
     }
 
@@ -224,7 +224,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      *
      * @param topic the name of the topic
      */
-    public void deleteTopicAndWait(final String topic) throws Exception {
+    public void deleteTopicAndWait(final String topic) throws InterruptedException {
         deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic);
     }
 
@@ -234,7 +234,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      * @param timeoutMs the max time to wait for the topic to be deleted (does not block if {@code <= 0})
      * @param topic the name of the topic
      */
-    public void deleteTopicAndWait(final long timeoutMs, final String topic) throws Exception {
+    public void deleteTopicAndWait(final long timeoutMs, final String topic) throws InterruptedException {
         deleteTopicsAndWait(timeoutMs, topic);
     }
 
@@ -243,7 +243,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      *
      * @param topics the name of the topics
      */
-    public void deleteTopics(final String... topics) throws Exception {
+    public void deleteTopics(final String... topics) throws InterruptedException {
         deleteTopicsAndWait(-1, topics);
     }
 
@@ -252,7 +252,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      *
      * @param topics the name of the topics
      */
-    public void deleteTopicsAndWait(final String... topics) throws Exception {
+    public void deleteTopicsAndWait(final String... topics) throws InterruptedException {
         deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
     }
 
@@ -262,7 +262,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
      * @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
      * @param topics the name of the topics
      */
-    public void deleteTopicsAndWait(final long timeoutMs, final String... topics) throws Exception {
+    public void deleteTopicsAndWait(final long timeoutMs, final String... topics) throws InterruptedException {
         for (final String topic : topics) {
             try {
                 brokers[0].deleteTopic(topic);
@@ -274,12 +274,12 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         }
     }
 
-    public void deleteAndRecreateTopics(final String... topics) throws Exception {
+    public void deleteAndRecreateTopics(final String... topics) throws InterruptedException {
         deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
         createTopics(topics);
     }
 
-    public void deleteAndRecreateTopics(final long timeoutMs, final String... topics) throws Exception {
+    public void deleteAndRecreateTopics(final long timeoutMs, final String... topics) throws InterruptedException {
         deleteTopicsAndWait(timeoutMs, topics);
         createTopics(topics);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index ab17c9e..6a6d2a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -159,7 +159,7 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Exception {
+    public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() {
         final String topic1 = "topic-1";
         final String topic2 = "topic-2";
         final String topic3 = "topic-3";
@@ -192,17 +192,17 @@ public class KStreamBuilderTest {
     }
 
     @Test(expected = TopologyBuilderException.class)
-    public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
+    public void shouldThrowExceptionWhenNoTopicPresent() {
         builder.stream();
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
+    public void shouldThrowExceptionWhenTopicNamesAreNull() {
         builder.stream(Serdes.String(), Serdes.String(), null, null);
     }
 
     @Test
-    public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
+    public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() {
         KTable table1 = builder.table("topic1", "table1");
         KTable table2 = builder.table("topic2", (String) null);
 
@@ -221,7 +221,7 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void shouldBuildSimpleGlobalTableTopology() throws Exception {
+    public void shouldBuildSimpleGlobalTableTopology() {
         builder.globalTable("table", "globalTable");
 
         final ProcessorTopology topology = builder.buildGlobalStateTopology();
@@ -231,7 +231,7 @@ public class KStreamBuilderTest {
         assertEquals("globalTable", stateStores.get(0).name());
     }
 
-    private void doBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+    private void doBuildGlobalTopologyWithAllGlobalTables() {
         final ProcessorTopology topology = builder.buildGlobalStateTopology();
 
         final List<StateStore> stateStores = topology.globalStateStores();
@@ -242,7 +242,7 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+    public void shouldBuildGlobalTopologyWithAllGlobalTables() {
         builder.globalTable("table", "globalTable");
         builder.globalTable("table2", "globalTable2");
 
@@ -250,7 +250,7 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception {
+    public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() {
         builder.globalTable("table");
         builder.globalTable("table2");
 
@@ -258,7 +258,7 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void shouldAddGlobalTablesToEachGroup() throws Exception {
+    public void shouldAddGlobalTablesToEachGroup() {
         final String one = "globalTable";
         final String two = "globalTable2";
         final GlobalKTable<String, String> globalTable = builder.globalTable("table", one);
@@ -294,7 +294,7 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
+    public void shouldMapStateStoresToCorrectSourceTopics() {
         final KStream<String, String> playEvents = builder.stream("events");
 
         final KTable<String, String> table = builder.table("table-topic", "table-store");
@@ -395,14 +395,14 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void kStreamTimestampExtractorShouldBeNull() throws Exception {
+    public void kStreamTimestampExtractorShouldBeNull() {
         builder.stream("topic");
         final ProcessorTopology processorTopology = builder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
     @Test
-    public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() throws Exception {
+    public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() {
         builder.stream(new MockTimestampExtractor(), null, null, "topic");
         final ProcessorTopology processorTopology = builder.build(null);
         for (final SourceNode sourceNode: processorTopology.sources()) {
@@ -411,28 +411,28 @@ public class KStreamBuilderTest {
     }
 
     @Test
-    public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() throws Exception {
+    public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() {
         builder.stream(null, new MockTimestampExtractor(), null, null, "topic");
         final ProcessorTopology processorTopology = builder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
     @Test
-    public void shouldAddTimestampExtractorToTablePerSource() throws Exception {
+    public void shouldAddTimestampExtractorToTablePerSource() {
         builder.table("topic", "store");
         final ProcessorTopology processorTopology = builder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
     @Test
-    public void kTableTimestampExtractorShouldBeNull() throws Exception {
+    public void kTableTimestampExtractorShouldBeNull() {
         builder.table("topic", "store");
         final ProcessorTopology processorTopology = builder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
     @Test
-    public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() throws Exception {
+    public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() {
         builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store");
         final ProcessorTopology processorTopology = builder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 87c7ece..0ee74b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -52,7 +52,7 @@ public class GlobalKTableJoinsTest {
     public final KStreamTestDriver driver = new KStreamTestDriver();
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         stateDir = TestUtils.tempDirectory();
         global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store");
         stream = builder.stream(streamTopic, Consumed.with(Serdes.String(), Serdes.String()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 78f8dbd..bc65e09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -74,111 +74,111 @@ public class KGroupedStreamImplTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullReducerOnReduce() throws Exception {
+    public void shouldNotHaveNullReducerOnReduce() {
         groupedStream.reduce(null, "store");
     }
 
     @Test
-    public void shouldAllowNullStoreNameOnReduce() throws Exception {
+    public void shouldAllowNullStoreNameOnReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, (String) null);
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void shouldNotHaveInvalidStoreNameOnReduce() throws Exception {
+    public void shouldNotHaveInvalidStoreNameOnReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, INVALID_STORE_NAME);
     }
 
     @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullStoreSupplierOnReduce() throws Exception {
+    public void shouldNotHaveNullStoreSupplierOnReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, (StateStoreSupplier<KeyValueStore>) null);
     }
 
     @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullStoreSupplierOnCount() throws Exception {
+    public void shouldNotHaveNullStoreSupplierOnCount() {
         groupedStream.count((StateStoreSupplier<KeyValueStore>) null);
     }
 
 
     @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullStoreSupplierOnWindowedCount() throws Exception {
+    public void shouldNotHaveNullStoreSupplierOnWindowedCount() {
         groupedStream.count(TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullReducerWithWindowedReduce() throws Exception {
+    public void shouldNotHaveNullReducerWithWindowedReduce() {
         groupedStream.reduce(null, TimeWindows.of(10), "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullWindowsWithWindowedReduce() throws Exception {
+    public void shouldNotHaveNullWindowsWithWindowedReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, (Windows) null, "store");
     }
 
     @Test
-    public void shouldAllowNullStoreNameWithWindowedReduce() throws Exception {
+    public void shouldAllowNullStoreNameWithWindowedReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), (String) null);
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void shouldNotHaveInvalidStoreNameWithWindowedReduce() throws Exception {
+    public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
         groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), INVALID_STORE_NAME);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullInitializerOnAggregate() throws Exception {
+    public void shouldNotHaveNullInitializerOnAggregate() {
         groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String(), "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullAdderOnAggregate() throws Exception {
+    public void shouldNotHaveNullAdderOnAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String(), "store");
     }
 
     @Test
-    public void shouldAllowNullStoreNameOnAggregate() throws Exception {
+    public void shouldAllowNullStoreNameOnAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), null);
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void shouldNotHaveInvalidStoreNameOnAggregate() throws Exception {
+    public void shouldNotHaveInvalidStoreNameOnAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), INVALID_STORE_NAME);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullInitializerOnWindowedAggregate() throws Exception {
+    public void shouldNotHaveNullInitializerOnWindowedAggregate() {
         groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullAdderOnWindowedAggregate() throws Exception {
+    public void shouldNotHaveNullAdderOnWindowedAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, null, TimeWindows.of(10), Serdes.String(), "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullWindowsOnWindowedAggregate() throws Exception {
+    public void shouldNotHaveNullWindowsOnWindowedAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Serdes.String(), "store");
     }
 
     @Test
-    public void shouldAllowNullStoreNameOnWindowedAggregate() throws Exception {
+    public void shouldAllowNullStoreNameOnWindowedAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), null);
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() throws Exception {
+    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), INVALID_STORE_NAME);
     }
 
     @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
+    public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
     }
 
-    private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) throws Exception {
+    private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) {
         driver.setUp(builder, TestUtils.tempDirectory());
         driver.setTime(10);
         driver.process(TOPIC, "1", "1");
@@ -199,7 +199,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test
-    public void shouldAggregateSessionWindows() throws Exception {
+    public void shouldAggregateSessionWindows() {
         final Map<Windowed<String>, Integer> results = new HashMap<>();
         KTable table = groupedStream.aggregate(new Initializer<Integer>() {
             @Override
@@ -229,7 +229,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test
-    public void shouldAggregateSessionWindowsWithInternalStoreName() throws Exception {
+    public void shouldAggregateSessionWindowsWithInternalStoreName() {
         final Map<Windowed<String>, Integer> results = new HashMap<>();
         KTable table = groupedStream.aggregate(new Initializer<Integer>() {
             @Override
@@ -258,7 +258,7 @@ public class KGroupedStreamImplTest {
         assertNull(table.queryableStoreName());
     }
 
-    private void doCountSessionWindows(final Map<Windowed<String>, Long> results) throws Exception {
+    private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {
         driver.setUp(builder, TestUtils.tempDirectory());
         driver.setTime(10);
         driver.process(TOPIC, "1", "1");
@@ -279,7 +279,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test
-    public void shouldCountSessionWindows() throws Exception {
+    public void shouldCountSessionWindows() {
         final Map<Windowed<String>, Long> results = new HashMap<>();
         KTable table = groupedStream.count(SessionWindows.with(30), "session-store");
         table.toStream().foreach(new ForeachAction<Windowed<String>, Long>() {
@@ -293,7 +293,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test
-    public void shouldCountSessionWindowsWithInternalStoreName() throws Exception {
+    public void shouldCountSessionWindowsWithInternalStoreName() {
         final Map<Windowed<String>, Long> results = new HashMap<>();
         KTable table = groupedStream.count(SessionWindows.with(30));
         table.toStream().foreach(new ForeachAction<Windowed<String>, Long>() {
@@ -306,7 +306,7 @@ public class KGroupedStreamImplTest {
         assertNull(table.queryableStoreName());
     }
 
-    private void doReduceSessionWindows(final Map<Windowed<String>, String> results) throws Exception {
+    private void doReduceSessionWindows(final Map<Windowed<String>, String> results) {
         driver.setUp(builder, TestUtils.tempDirectory());
         driver.setTime(10);
         driver.process(TOPIC, "1", "A");
@@ -327,7 +327,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test
-    public void shouldReduceSessionWindows() throws Exception {
+    public void shouldReduceSessionWindows() {
         final Map<Windowed<String>, String> results = new HashMap<>();
         KTable table = groupedStream.reduce(
                 new Reducer<String>() {
@@ -348,7 +348,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test
-    public void shouldReduceSessionWindowsWithInternalStoreName() throws Exception {
+    public void shouldReduceSessionWindowsWithInternalStoreName() {
         final Map<Windowed<String>, String> results = new HashMap<>();
         KTable table = groupedStream.reduce(
                 new Reducer<String>() {
@@ -369,33 +369,33 @@ public class KGroupedStreamImplTest {
 
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() throws Exception {
+    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
         groupedStream.reduce(null, SessionWindows.with(10), "store");
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() throws Exception {
+    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
         groupedStream.reduce(MockReducer.STRING_ADDER, (SessionWindows) null, "store");
     }
 
     @Test
-    public void shouldAcceptNullStoreNameWhenReducingSessionWindows() throws Exception {
+    public void shouldAcceptNullStoreNameWhenReducingSessionWindows() {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (String) null);
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() throws Exception {
+    public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), INVALID_STORE_NAME);
     }
 
     @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() throws Exception {
+    public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (StateStoreSupplier<SessionStore>) null);
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() throws Exception {
+    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
         groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -405,7 +405,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() throws Exception {
+    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, null, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -415,7 +415,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() throws Exception {
+    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT,
                 MockAggregator.TOSTRING_ADDER,
                 null,
@@ -425,7 +425,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() throws Exception {
+    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -435,7 +435,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test
-    public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() throws Exception {
+    public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -445,7 +445,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() throws Exception {
+    public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -456,7 +456,7 @@ public class KGroupedStreamImplTest {
 
     @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() throws Exception {
+    public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
             @Override
             public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -466,27 +466,27 @@ public class KGroupedStreamImplTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullSessionWindowsWhenCountingSessionWindows() throws Exception {
+    public void shouldNotAcceptNullSessionWindowsWhenCountingSessionWindows() {
         groupedStream.count((SessionWindows) null, "store");
     }
 
     @Test
-    public void shouldAcceptNullStoreNameWhenCountingSessionWindows() throws Exception {
+    public void shouldAcceptNullStoreNameWhenCountingSessionWindows() {
         groupedStream.count(SessionWindows.with(90), (String) null);
     }
 
     @Test(expected = InvalidTopicException.class)
-    public void shouldNotAcceptInvalidStoreNameWhenCountingSessionWindows() throws Exception {
+    public void shouldNotAcceptInvalidStoreNameWhenCountingSessionWindows() {
         groupedStream.count(SessionWindows.with(90), INVALID_STORE_NAME);
     }
 
     @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
-    public void shouldNotAcceptNullStateStoreSupplierWhenCountingSessionWindows() throws Exception {
+    public void shouldNotAcceptNullStateStoreSupplierWhenCountingSessionWindows() {
         groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null);
     }
 
-    private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) throws Exception {
+    private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
         driver.setUp(builder, TestUtils.tempDirectory(), 0);
         driver.setTime(0);
         driver.process(TOPIC, "1", "A");
@@ -509,7 +509,7 @@ public class KGroupedStreamImplTest {
     }
 
     @Test
-    public void shouldCountWindowed() throws Exception {
+    public void shouldCountWindowed() {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
         groupedStream.count(
                 TimeWindows.of(500L),
@@ -527,7 +527,7 @@ public class KGroupedStreamImplTest {
 
     @SuppressWarnings("deprecation")
     @Test
-    public void shouldCountWindowedWithInternalStoreName() throws Exception {
+    public void shouldCountWindowedWithInternalStoreName() {
         final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
         groupedStream.count(
                 TimeWindows.of(500L))