You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/11 08:42:21 UTC
[4/5] kafka git commit: KAFKA-5531;
throw concrete exceptions in streams tests
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index 9204b88..105dd2e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -63,56 +63,56 @@ public class KGroupedTableImplTest {
}
@Test
- public void shouldAllowNullStoreNameOnAggregate() throws Exception {
+ public void shouldAllowNullStoreNameOnAggregate() {
groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, (String) null);
}
@Test(expected = InvalidTopicException.class)
- public void shouldNotAllowInvalidStoreNameOnAggregate() throws Exception {
+ public void shouldNotAllowInvalidStoreNameOnAggregate() {
groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, INVALID_STORE_NAME);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullInitializerOnAggregate() throws Exception {
+ public void shouldNotAllowNullInitializerOnAggregate() {
groupedTable.aggregate(null, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "store");
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullAdderOnAggregate() throws Exception {
+ public void shouldNotAllowNullAdderOnAggregate() {
groupedTable.aggregate(MockInitializer.STRING_INIT, null, MockAggregator.TOSTRING_REMOVER, "store");
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullSubtractorOnAggregate() throws Exception {
+ public void shouldNotAllowNullSubtractorOnAggregate() {
groupedTable.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, "store");
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullAdderOnReduce() throws Exception {
+ public void shouldNotAllowNullAdderOnReduce() {
groupedTable.reduce(null, MockReducer.STRING_REMOVER, "store");
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullSubtractorOnReduce() throws Exception {
+ public void shouldNotAllowNullSubtractorOnReduce() {
groupedTable.reduce(MockReducer.STRING_ADDER, null, "store");
}
@Test
- public void shouldAllowNullStoreNameOnReduce() throws Exception {
+ public void shouldAllowNullStoreNameOnReduce() {
groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (String) null);
}
@Test(expected = InvalidTopicException.class)
- public void shouldNotAllowInvalidStoreNameOnReduce() throws Exception {
+ public void shouldNotAllowInvalidStoreNameOnReduce() {
groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, INVALID_STORE_NAME);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullStoreSupplierOnReduce() throws Exception {
+ public void shouldNotAllowNullStoreSupplierOnReduce() {
groupedTable.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, (StateStoreSupplier<KeyValueStore>) null);
}
- private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) throws Exception {
+ private void doShouldReduce(final KTable<String, Integer> reduced, final String topic) {
final Map<String, Integer> results = new HashMap<>();
reduced.foreach(new ForeachAction<String, Integer>() {
@Override
@@ -141,7 +141,7 @@ public class KGroupedTableImplTest {
}
@Test
- public void shouldReduce() throws Exception {
+ public void shouldReduce() {
final String topic = "input";
final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
@@ -160,7 +160,7 @@ public class KGroupedTableImplTest {
}
@Test
- public void shouldReduceWithInternalStoreName() throws Exception {
+ public void shouldReduceWithInternalStoreName() {
final String topic = "input";
final KeyValueMapper<String, Number, KeyValue<String, Integer>> intProjection =
new KeyValueMapper<String, Number, KeyValue<String, Integer>>() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 32c21fe..5e8687f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -260,157 +260,157 @@ public class KStreamImplTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullPredicateOnFilter() throws Exception {
+ public void shouldNotAllowNullPredicateOnFilter() {
testStream.filter(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullPredicateOnFilterNot() throws Exception {
+ public void shouldNotAllowNullPredicateOnFilterNot() {
testStream.filterNot(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullMapperOnSelectKey() throws Exception {
+ public void shouldNotAllowNullMapperOnSelectKey() {
testStream.selectKey(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullMapperOnMap() throws Exception {
+ public void shouldNotAllowNullMapperOnMap() {
testStream.map(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullMapperOnMapValues() throws Exception {
+ public void shouldNotAllowNullMapperOnMapValues() {
testStream.mapValues(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullFilePathOnWriteAsText() throws Exception {
+ public void shouldNotAllowNullFilePathOnWriteAsText() {
testStream.writeAsText(null);
}
@Test(expected = TopologyException.class)
- public void shouldNotAllowEmptyFilePathOnWriteAsText() throws Exception {
+ public void shouldNotAllowEmptyFilePathOnWriteAsText() {
testStream.writeAsText("\t \t");
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullMapperOnFlatMap() throws Exception {
+ public void shouldNotAllowNullMapperOnFlatMap() {
testStream.flatMap(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullMapperOnFlatMapValues() throws Exception {
+ public void shouldNotAllowNullMapperOnFlatMapValues() {
testStream.flatMapValues(null);
}
@Test(expected = IllegalArgumentException.class)
- public void shouldHaveAtLeastOnPredicateWhenBranching() throws Exception {
+ public void shouldHaveAtLeastOnPredicateWhenBranching() {
testStream.branch();
}
@Test(expected = NullPointerException.class)
- public void shouldCantHaveNullPredicate() throws Exception {
+ public void shouldCantHaveNullPredicate() {
testStream.branch((Predicate) null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTopicOnThrough() throws Exception {
+ public void shouldNotAllowNullTopicOnThrough() {
testStream.through(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTopicOnTo() throws Exception {
+ public void shouldNotAllowNullTopicOnTo() {
testStream.to(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTransformSupplierOnTransform() throws Exception {
+ public void shouldNotAllowNullTransformSupplierOnTransform() {
testStream.transform(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTransformSupplierOnTransformValues() throws Exception {
+ public void shouldNotAllowNullTransformSupplierOnTransformValues() {
testStream.transformValues(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullProcessSupplier() throws Exception {
+ public void shouldNotAllowNullProcessSupplier() {
testStream.process(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullOtherStreamOnJoin() throws Exception {
+ public void shouldNotAllowNullOtherStreamOnJoin() {
testStream.join(null, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10));
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullValueJoinerOnJoin() throws Exception {
+ public void shouldNotAllowNullValueJoinerOnJoin() {
testStream.join(testStream, null, JoinWindows.of(10));
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullJoinWindowsOnJoin() throws Exception {
+ public void shouldNotAllowNullJoinWindowsOnJoin() {
testStream.join(testStream, MockValueJoiner.TOSTRING_JOINER, null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTableOnTableJoin() throws Exception {
+ public void shouldNotAllowNullTableOnTableJoin() {
testStream.leftJoin((KTable) null, MockValueJoiner.TOSTRING_JOINER);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullValueMapperOnTableJoin() throws Exception {
+ public void shouldNotAllowNullValueMapperOnTableJoin() {
testStream.leftJoin(builder.table(Serdes.String(), Serdes.String(), "topic", "store"), null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullSelectorOnGroupBy() throws Exception {
+ public void shouldNotAllowNullSelectorOnGroupBy() {
testStream.groupBy(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullActionOnForEach() throws Exception {
+ public void shouldNotAllowNullActionOnForEach() {
testStream.foreach(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTableOnJoinWithGlobalTable() throws Exception {
+ public void shouldNotAllowNullTableOnJoinWithGlobalTable() {
testStream.join((GlobalKTable) null,
MockKeyValueMapper.<String, String>SelectValueMapper(),
MockValueJoiner.TOSTRING_JOINER);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullMapperOnJoinWithGlobalTable() throws Exception {
+ public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
null,
MockValueJoiner.TOSTRING_JOINER);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() throws Exception {
+ public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
MockKeyValueMapper.<String, String>SelectValueMapper(),
null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() throws Exception {
+ public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() {
testStream.leftJoin((GlobalKTable) null,
MockKeyValueMapper.<String, String>SelectValueMapper(),
MockValueJoiner.TOSTRING_JOINER);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() throws Exception {
+ public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
null,
MockValueJoiner.TOSTRING_JOINER);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() throws Exception {
+ public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
MockKeyValueMapper.<String, String>SelectValueMapper(),
null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index efb9f12..572c0b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -62,7 +62,7 @@ public class KStreamKStreamJoinTest {
}
@Test
- public void testJoin() throws Exception {
+ public void testJoin() {
StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -163,7 +163,7 @@ public class KStreamKStreamJoinTest {
}
@Test
- public void testOuterJoin() throws Exception {
+ public void testOuterJoin() {
StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -264,7 +264,7 @@ public class KStreamKStreamJoinTest {
}
@Test
- public void testWindowing() throws Exception {
+ public void testWindowing() {
long time = 0L;
StreamsBuilder builder = new StreamsBuilder();
@@ -494,7 +494,7 @@ public class KStreamKStreamJoinTest {
}
@Test
- public void testAsymetricWindowingAfter() throws Exception {
+ public void testAsymetricWindowingAfter() {
long time = 1000L;
StreamsBuilder builder = new StreamsBuilder();
@@ -608,7 +608,7 @@ public class KStreamKStreamJoinTest {
}
@Test
- public void testAsymetricWindowingBefore() throws Exception {
+ public void testAsymetricWindowingBefore() {
long time = 1000L;
StreamsBuilder builder = new StreamsBuilder();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index c56956d..745ab4e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -62,7 +62,7 @@ public class KStreamKStreamLeftJoinTest {
}
@Test
- public void testLeftJoin() throws Exception {
+ public void testLeftJoin() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -153,7 +153,7 @@ public class KStreamKStreamLeftJoinTest {
}
@Test
- public void testWindowing() throws Exception {
+ public void testWindowing() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
long time = 0L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 524cf42..d206b02 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -57,7 +57,7 @@ public class KStreamKTableJoinTest {
}
@Test
- public void testJoin() throws Exception {
+ public void testJoin() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 9e9fddc..a79184e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -58,7 +58,7 @@ public class KStreamKTableLeftJoinTest {
}
@Test
- public void testJoin() throws Exception {
+ public void testJoin() {
StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 0662944..7f0d28a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -118,7 +118,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
@Test
- public void shouldCreateSingleSessionWhenWithinGap() throws Exception {
+ public void shouldCreateSingleSessionWhenWithinGap() {
context.setTime(0);
processor.process("john", "first");
context.setTime(500);
@@ -131,7 +131,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
- public void shouldMergeSessions() throws Exception {
+ public void shouldMergeSessions() {
context.setTime(0);
final String sessionId = "mel";
processor.process(sessionId, "first");
@@ -155,7 +155,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
@Test
- public void shouldUpdateSessionIfTheSameTime() throws Exception {
+ public void shouldUpdateSessionIfTheSameTime() {
context.setTime(0);
processor.process("mel", "first");
processor.process("mel", "second");
@@ -165,7 +165,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
@Test
- public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() throws Exception {
+ public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
final String sessionId = "mel";
long time = 0;
context.setTime(time);
@@ -190,7 +190,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
- public void shouldRemoveMergedSessionsFromStateStore() throws Exception {
+ public void shouldRemoveMergedSessionsFromStateStore() {
context.setTime(0);
processor.process("a", "1");
@@ -208,7 +208,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
@Test
- public void shouldHandleMultipleSessionsAndMerging() throws Exception {
+ public void shouldHandleMultipleSessionsAndMerging() {
context.setTime(0);
processor.process("a", "1");
processor.process("b", "1");
@@ -238,7 +238,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
@Test
- public void shouldGetAggregatedValuesFromValueGetter() throws Exception {
+ public void shouldGetAggregatedValuesFromValueGetter() {
final KTableValueGetter<Windowed<String>, Long> getter = sessionAggregator.view().get();
getter.init(context);
context.setTime(0);
@@ -253,7 +253,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
@Test
- public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() throws Exception {
+ public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
initStore(false);
processor.init(context);
@@ -268,7 +268,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
}
@Test
- public void shouldImmediatelyForwardRemovedSessionsWhenMerging() throws Exception {
+ public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
initStore(false);
processor.init(context);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index b39ac36..aa660e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -49,14 +49,14 @@ public class KStreamWindowAggregateTest {
private File stateDir = null;
@Rule
public final KStreamTestDriver driver = new KStreamTestDriver();
-
+
@Before
public void setUp() throws IOException {
stateDir = TestUtils.tempDirectory("kafka-test");
}
@Test
- public void testAggBasic() throws Exception {
+ public void testAggBasic() {
final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
@@ -147,7 +147,7 @@ public class KStreamWindowAggregateTest {
}
@Test
- public void testJoin() throws Exception {
+ public void testJoin() {
final StreamsBuilder builder = new StreamsBuilder();
String topic1 = "topic1";
String topic2 = "topic2";
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 781eb61..9cdc782 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -67,7 +67,7 @@ public class KTableKTableLeftJoinTest {
}
@Test
- public void testJoin() throws Exception {
+ public void testJoin() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -161,7 +161,7 @@ public class KTableKTableLeftJoinTest {
}
@Test
- public void testNotSendingOldValue() throws Exception {
+ public void testNotSendingOldValue() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -242,7 +242,7 @@ public class KTableKTableLeftJoinTest {
}
@Test
- public void testSendingOldValue() throws Exception {
+ public void testSendingOldValue() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -330,7 +330,7 @@ public class KTableKTableLeftJoinTest {
* Before the fix this would trigger an IllegalStateException.
*/
@Test
- public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() throws Exception {
+ public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() {
final String agg = "agg";
final String tableOne = "tableOne";
final String tableTwo = "tableTwo";
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 7ab6d87..368a3ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -61,7 +61,7 @@ public class KTableKTableOuterJoinTest {
}
@Test
- public void testJoin() throws Exception {
+ public void testJoin() {
StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -166,7 +166,7 @@ public class KTableKTableOuterJoinTest {
}
@Test
- public void testNotSendingOldValue() throws Exception {
+ public void testNotSendingOldValue() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
@@ -254,7 +254,7 @@ public class KTableKTableOuterJoinTest {
}
@Test
- public void testSendingOldValue() throws Exception {
+ public void testSendingOldValue() {
final StreamsBuilder builder = new StreamsBuilder();
final int[] expectedKeys = new int[]{0, 1, 2, 3};
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
index aca3352..59371d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionKeySerdeTest.java
@@ -39,66 +39,66 @@ public class SessionKeySerdeTest {
final private SessionKeySerde<String> sessionKeySerde = new SessionKeySerde<>(serde);
@Test
- public void shouldSerializeDeserialize() throws Exception {
+ public void shouldSerializeDeserialize() {
final byte[] bytes = sessionKeySerde.serializer().serialize(topic, windowedKey);
final Windowed<String> result = sessionKeySerde.deserializer().deserialize(topic, bytes);
assertEquals(windowedKey, result);
}
@Test
- public void shouldSerializeNullToNull() throws Exception {
+ public void shouldSerializeNullToNull() {
assertNull(sessionKeySerde.serializer().serialize(topic, null));
}
@Test
- public void shouldDeSerializeEmtpyByteArrayToNull() throws Exception {
+ public void shouldDeSerializeEmtpyByteArrayToNull() {
assertNull(sessionKeySerde.deserializer().deserialize(topic, new byte[0]));
}
@Test
- public void shouldDeSerializeNullToNull() throws Exception {
+ public void shouldDeSerializeNullToNull() {
assertNull(sessionKeySerde.deserializer().deserialize(topic, null));
}
@Test
- public void shouldConvertToBinaryAndBack() throws Exception {
+ public void shouldConvertToBinaryAndBack() {
final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
final Windowed<String> result = SessionKeySerde.from(serialized.get(), Serdes.String().deserializer(), "dummy");
assertEquals(windowedKey, result);
}
@Test
- public void shouldExtractEndTimeFromBinary() throws Exception {
+ public void shouldExtractEndTimeFromBinary() {
final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
assertEquals(endTime, SessionKeySerde.extractEnd(serialized.get()));
}
@Test
- public void shouldExtractStartTimeFromBinary() throws Exception {
+ public void shouldExtractStartTimeFromBinary() {
final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
assertEquals(startTime, SessionKeySerde.extractStart(serialized.get()));
}
@Test
- public void shouldExtractWindowFromBindary() throws Exception {
+ public void shouldExtractWindowFromBindary() {
final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
assertEquals(window, SessionKeySerde.extractWindow(serialized.get()));
}
@Test
- public void shouldExtractKeyBytesFromBinary() throws Exception {
+ public void shouldExtractKeyBytesFromBinary() {
final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
assertArrayEquals(key.getBytes(), SessionKeySerde.extractKeyBytes(serialized.get()));
}
@Test
- public void shouldExtractKeyFromBinary() throws Exception {
+ public void shouldExtractKeyFromBinary() {
final Bytes serialized = SessionKeySerde.toBinary(windowedKey, serde.serializer(), "dummy");
assertEquals(windowedKey, SessionKeySerde.from(serialized.get(), serde.deserializer(), "dummy"));
}
@Test
- public void shouldExtractBytesKeyFromBinary() throws Exception {
+ public void shouldExtractBytesKeyFromBinary() {
final Bytes bytesKey = Bytes.wrap(key.getBytes());
final Windowed<Bytes> windowedBytesKey = new Windowed<>(bytesKey, window);
final Bytes serialized = SessionKeySerde.bytesToBinary(windowedBytesKey);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 9fb8480..3cbceeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -129,7 +129,7 @@ public class SimpleBenchmark {
this.numThreads = numThreads;
}
- private void run() throws Exception {
+ private void run() {
switch (testName) {
case ALL_TESTS:
// producer performance
@@ -187,12 +187,12 @@ public class SimpleBenchmark {
yahooBenchmark(YAHOO_CAMPAIGNS_TOPIC, YAHOO_EVENTS_TOPIC);
break;
default:
- throw new Exception("Unknown test name " + testName);
+ throw new RuntimeException("Unknown test name " + testName);
}
}
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
String kafka = args.length > 0 ? args[0] : "localhost:9092";
String stateDirStr = args.length > 1 ? args[1] : TestUtils.tempDirectory().getAbsolutePath();
int numRecords = args.length > 2 ? Integer.parseInt(args[2]) : 10000000;
@@ -254,7 +254,7 @@ public class SimpleBenchmark {
}
private boolean maybeSetupPhase(final String topic, final String clientId,
- final boolean skipIfAllTests) throws Exception {
+ final boolean skipIfAllTests) {
resetStats();
// initialize topics
if (loadPhase) {
@@ -291,7 +291,7 @@ public class SimpleBenchmark {
}
- private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) throws Exception {
+ private void yahooBenchmark(final String campaignsTopic, final String eventsTopic) {
YahooBenchmark benchmark = new YahooBenchmark(this, campaignsTopic, eventsTopic);
benchmark.run();
@@ -304,7 +304,7 @@ public class SimpleBenchmark {
* @param countTopic Topic where numbers are stored
* @throws Exception
*/
- public void count(String countTopic) throws Exception {
+ public void count(String countTopic) {
if (maybeSetupPhase(countTopic, "simple-benchmark-produce-count", false)) {
return;
}
@@ -319,7 +319,7 @@ public class SimpleBenchmark {
* Measure the performance of a KStream-KTable left join. The setup is such that each
* KStream record joins to exactly one element in the KTable
*/
- public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) throws Exception {
+ public void kStreamKTableJoin(String kStreamTopic, String kTableTopic) {
if (maybeSetupPhase(kStreamTopic, "simple-benchmark-produce-kstream", false)) {
maybeSetupPhase(kTableTopic, "simple-benchmark-produce-ktable", false);
return;
@@ -339,7 +339,7 @@ public class SimpleBenchmark {
* Measure the performance of a KStream-KStream left join. The setup is such that each
* KStream record joins to exactly one element in the other KStream
*/
- public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) throws Exception {
+ public void kStreamKStreamJoin(String kStreamTopic1, String kStreamTopic2) {
if (maybeSetupPhase(kStreamTopic1, "simple-benchmark-produce-kstream-topic1", false)) {
maybeSetupPhase(kStreamTopic2, "simple-benchmark-produce-kstream-topic2", false);
return;
@@ -359,7 +359,7 @@ public class SimpleBenchmark {
* Measure the performance of a KTable-KTable left join. The setup is such that each
* KTable record joins to exactly one element in the other KTable
*/
- public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) throws Exception {
+ public void kTableKTableJoin(String kTableTopic1, String kTableTopic2) {
if (maybeSetupPhase(kTableTopic1, "simple-benchmark-produce-ktable-topic1", false)) {
maybeSetupPhase(kTableTopic2, "simple-benchmark-produce-ktable-topic2", false);
return;
@@ -400,7 +400,7 @@ public class SimpleBenchmark {
streams.close();
}
- private long startStreamsThread(final KafkaStreams streams, final CountDownLatch latch) throws Exception {
+ private long startStreamsThread(final KafkaStreams streams, final CountDownLatch latch) {
Thread thread = new Thread() {
public void run() {
streams.start();
@@ -430,7 +430,7 @@ public class SimpleBenchmark {
return endTime - startTime;
}
- public void processStream(final String topic) throws Exception {
+ public void processStream(final String topic) {
if (maybeSetupPhase(topic, "simple-benchmark-process-stream-load", true)) {
return;
}
@@ -443,7 +443,7 @@ public class SimpleBenchmark {
printResults("Streams Performance [records/latency/rec-sec/MB-sec source]: ", latency);
}
- public void processStreamWithSink(String topic) throws Exception {
+ public void processStreamWithSink(String topic) {
if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-sink-load", true)) {
return;
}
@@ -456,7 +456,7 @@ public class SimpleBenchmark {
}
- public void processStreamWithStateStore(String topic) throws Exception {
+ public void processStreamWithStateStore(String topic) {
if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-state-store-load", true)) {
return;
}
@@ -468,7 +468,7 @@ public class SimpleBenchmark {
}
- public void processStreamWithCachedStateStore(String topic) throws Exception {
+ public void processStreamWithCachedStateStore(String topic) {
if (maybeSetupPhase(topic, "simple-benchmark-process-stream-with-cached-state-store-load", true)) {
return;
}
@@ -479,7 +479,7 @@ public class SimpleBenchmark {
printResults("Streams Performance [records/latency/rec-sec/MB-sec source+cache+store]: ", latency);
}
- public void produce(String topic) throws Exception {
+ public void produce(String topic) {
// loading phase does not make sense for producer
if (loadPhase) {
resetStats();
@@ -499,11 +499,11 @@ public class SimpleBenchmark {
* when this produce step is part of another benchmark that produces its own stats
*/
private void produce(String topic, int valueSizeBytes, String clientId, int numRecords, boolean sequential,
- int upperRange, boolean printStats) throws Exception {
+ int upperRange, boolean printStats) {
if (sequential) {
- if (upperRange < numRecords) throw new Exception("UpperRange must be >= numRecords");
+ if (upperRange < numRecords) throw new IllegalArgumentException("UpperRange must be >= numRecords");
}
if (!sequential) {
System.out.println("WARNING: You are using non-sequential keys. If your tests' exit logic expects to see a final key, random keys may not work.");
@@ -539,7 +539,7 @@ public class SimpleBenchmark {
}
}
- public void consume(String topic) throws Exception {
+ public void consume(String topic) {
if (maybeSetupPhase(topic, "simple-benchmark-consumer-load", true)) {
return;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index d2d0681..0a10f9f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -89,7 +89,7 @@ public class YahooBenchmark {
private boolean maybeSetupPhaseCampaigns(final String topic, final String clientId,
final boolean skipIfAllTests,
final int numCampaigns, final int adsPerCampaign,
- final List<String> ads) throws Exception {
+ final List<String> ads) {
parent.resetStats();
// initialize topics
if (parent.loadPhase) {
@@ -128,7 +128,7 @@ public class YahooBenchmark {
// just for Yahoo benchmark
private boolean maybeSetupPhaseEvents(final String topic, final String clientId,
final boolean skipIfAllTests, final int numRecords,
- final List<String> ads) throws Exception {
+ final List<String> ads) {
parent.resetStats();
String[] eventTypes = new String[]{"view", "click", "purchase"};
Random rand = new Random();
@@ -181,7 +181,7 @@ public class YahooBenchmark {
}
- public void run() throws Exception {
+ public void run() {
int numCampaigns = 100;
int adsPerCampaign = 10;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 0cd2f2a..ded2732 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -457,19 +457,19 @@ public class TopologyBuilderTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullNameWhenAddingSink() throws Exception {
+ public void shouldNotAllowNullNameWhenAddingSink() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSink(null, "topic");
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullTopicWhenAddingSink() throws Exception {
+ public void shouldNotAllowNullTopicWhenAddingSink() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSink("name", null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception {
+ public void shouldNotAllowNullNameWhenAddingProcessor() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addProcessor(null, new ProcessorSupplier() {
@Override
@@ -480,37 +480,37 @@ public class TopologyBuilderTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullProcessorSupplier() throws Exception {
+ public void shouldNotAllowNullProcessorSupplier() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addProcessor("name", null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullNameWhenAddingSource() throws Exception {
+ public void shouldNotAllowNullNameWhenAddingSource() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource(null, Pattern.compile(".*"));
}
@Test(expected = NullPointerException.class)
- public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws Exception {
+ public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() {
final TopologyBuilder builder = new TopologyBuilder();
builder.connectProcessorAndStateStores(null, "store");
}
@Test(expected = NullPointerException.class)
- public void shouldNotAddNullInternalTopic() throws Exception {
+ public void shouldNotAddNullInternalTopic() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addInternalTopic(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotSetApplicationIdToNull() throws Exception {
+ public void shouldNotSetApplicationIdToNull() {
final TopologyBuilder builder = new TopologyBuilder();
builder.setApplicationId(null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAddNullStateStoreSupplier() throws Exception {
+ public void shouldNotAddNullStateStoreSupplier() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addStateStore(null);
}
@@ -524,7 +524,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception {
+ public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source", "topic");
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
@@ -535,7 +535,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception {
+ public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source", "topic");
builder.addProcessor("processor", new MockProcessorSupplier(), "source");
@@ -546,7 +546,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception {
+ public void shouldCorrectlyMapStateStoreToInternalTopics() {
final TopologyBuilder builder = new TopologyBuilder();
builder.setApplicationId("appId");
builder.addInternalTopic("internal-topic");
@@ -560,7 +560,7 @@ public class TopologyBuilderTest {
@SuppressWarnings("unchecked")
@Test
- public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() throws Exception {
+ public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() {
final TopologyBuilder builder = new TopologyBuilder();
builder.setApplicationId("appId");
builder.addSource("source", "topic");
@@ -580,7 +580,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception {
+ public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() {
final TopologyBuilder builder = new TopologyBuilder();
builder.setApplicationId("appId");
builder.addSource("source", "topic");
@@ -596,7 +596,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws Exception {
+ public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() {
final TopologyBuilder builder = new TopologyBuilder();
builder.setApplicationId("appId");
builder.addInternalTopic("foo");
@@ -701,7 +701,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldAddTimestampExtractorPerSource() throws Exception {
+ public void shouldAddTimestampExtractorPerSource() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource(new MockTimestampExtractor(), "source", "topic");
final ProcessorTopology processorTopology = builder.build(null);
@@ -709,7 +709,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldAddTimestampExtractorWithOffsetResetPerSource() throws Exception {
+ public void shouldAddTimestampExtractorWithOffsetResetPerSource() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource(null, new MockTimestampExtractor(), "source", "topic");
final ProcessorTopology processorTopology = builder.build(null);
@@ -717,7 +717,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception {
+ public void shouldAddTimestampExtractorWithPatternPerSource() {
final TopologyBuilder builder = new TopologyBuilder();
final Pattern pattern = Pattern.compile("t.*");
builder.addSource(new MockTimestampExtractor(), "source", pattern);
@@ -726,7 +726,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() throws Exception {
+ public void shouldAddTimestampExtractorWithOffsetResetAndPatternPerSource() {
final TopologyBuilder builder = new TopologyBuilder();
final Pattern pattern = Pattern.compile("t.*");
builder.addSource(null, new MockTimestampExtractor(), "source", pattern);
@@ -735,7 +735,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() throws Exception {
+ public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesPerSource() {
final TopologyBuilder builder = new TopologyBuilder();
builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic");
final ProcessorTopology processorTopology = builder.build(null);
@@ -743,7 +743,7 @@ public class TopologyBuilderTest {
}
@Test
- public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() throws Exception {
+ public void shouldAddTimestampExtractorWithOffsetResetAndKeyValSerdesAndPatternPerSource() {
final TopologyBuilder builder = new TopologyBuilder();
final Pattern pattern = Pattern.compile("t.*");
builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern);
@@ -751,6 +751,7 @@ public class TopologyBuilderTest {
assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
+ @Test
public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception {
final TopologyBuilder topologyBuilder = new TopologyBuilder()
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 72082da..16e5b39 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -50,7 +50,7 @@ public class AbstractProcessorContextTest {
}
@Test
- public void shouldThrowIllegalStateExceptionOnRegisterWhenContextIsInitialized() throws Exception {
+ public void shouldThrowIllegalStateExceptionOnRegisterWhenContextIsInitialized() {
context.initialized();
try {
context.register(stateStore, false, null);
@@ -61,7 +61,7 @@ public class AbstractProcessorContextTest {
}
@Test
- public void shouldNotThrowIllegalStateExceptionOnRegisterWhenContextIsNotInitialized() throws Exception {
+ public void shouldNotThrowIllegalStateExceptionOnRegisterWhenContextIsNotInitialized() {
context.register(stateStore, false, null);
}
@@ -71,7 +71,7 @@ public class AbstractProcessorContextTest {
}
@Test
- public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() throws Exception {
+ public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() {
context.setRecordContext(null);
try {
context.topic();
@@ -82,18 +82,18 @@ public class AbstractProcessorContextTest {
}
@Test
- public void shouldReturnTopicFromRecordContext() throws Exception {
+ public void shouldReturnTopicFromRecordContext() {
assertThat(context.topic(), equalTo(recordContext.topic()));
}
@Test
- public void shouldReturnNullIfTopicEqualsNonExistTopic() throws Exception {
+ public void shouldReturnNullIfTopicEqualsNonExistTopic() {
context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC));
assertThat(context.topic(), nullValue());
}
@Test
- public void shouldThrowIllegalStateExceptionOnPartitionIfNoRecordContext() throws Exception {
+ public void shouldThrowIllegalStateExceptionOnPartitionIfNoRecordContext() {
context.setRecordContext(null);
try {
context.partition();
@@ -104,12 +104,12 @@ public class AbstractProcessorContextTest {
}
@Test
- public void shouldReturnPartitionFromRecordContext() throws Exception {
+ public void shouldReturnPartitionFromRecordContext() {
assertThat(context.partition(), equalTo(recordContext.partition()));
}
@Test
- public void shouldThrowIllegalStateExceptionOnOffsetIfNoRecordContext() throws Exception {
+ public void shouldThrowIllegalStateExceptionOnOffsetIfNoRecordContext() {
context.setRecordContext(null);
try {
context.offset();
@@ -119,12 +119,12 @@ public class AbstractProcessorContextTest {
}
@Test
- public void shouldReturnOffsetFromRecordContext() throws Exception {
+ public void shouldReturnOffsetFromRecordContext() {
assertThat(context.offset(), equalTo(recordContext.offset()));
}
@Test
- public void shouldThrowIllegalStateExceptionOnTimestampIfNoRecordContext() throws Exception {
+ public void shouldThrowIllegalStateExceptionOnTimestampIfNoRecordContext() {
context.setRecordContext(null);
try {
context.timestamp();
@@ -135,7 +135,7 @@ public class AbstractProcessorContextTest {
}
@Test
- public void shouldReturnTimestampFromRecordContext() throws Exception {
+ public void shouldReturnTimestampFromRecordContext() {
assertThat(context.timestamp(), equalTo(recordContext.timestamp()));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 54726d8..d2d439c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -55,21 +55,21 @@ public class AbstractTaskTest {
}
@Test(expected = ProcessorStateException.class)
- public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() throws Exception {
+ public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() {
final Consumer consumer = mockConsumer(new AuthorizationException("blah"));
final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList());
task.updateOffsetLimits();
}
@Test(expected = ProcessorStateException.class)
- public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() throws Exception {
+ public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() {
final Consumer consumer = mockConsumer(new KafkaException("blah"));
final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList());
task.updateOffsetLimits();
}
@Test(expected = WakeupException.class)
- public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() throws Exception {
+ public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() {
final Consumer consumer = mockConsumer(new WakeupException());
final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList());
task.updateOffsetLimits();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index 77001ce..6966d67 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -47,14 +47,14 @@ public class CopartitionedTopicsValidatorTest {
}
@Test(expected = TopologyBuilderException.class)
- public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() throws Exception {
+ public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() {
validator.validate(Collections.singleton("topic"),
Collections.<String, StreamPartitionAssignor.InternalTopicMetadata>emptyMap(),
cluster);
}
@Test(expected = TopologyBuilderException.class)
- public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() throws Exception {
+ public void shouldThrowTopologyBuilderExceptionIfPartitionCountsForCoPartitionedTopicsDontMatch() {
partitions.remove(new TopicPartition("second", 0));
validator.validate(Utils.mkSet("first", "second"),
Collections.<String, StreamPartitionAssignor.InternalTopicMetadata>emptyMap(),
@@ -63,7 +63,7 @@ public class CopartitionedTopicsValidatorTest {
@Test
- public void shouldEnforceCopartitioningOnRepartitionTopics() throws Exception {
+ public void shouldEnforceCopartitioningOnRepartitionTopics() {
final StreamPartitionAssignor.InternalTopicMetadata metadata = createTopicMetadata("repartitioned", 10);
validator.validate(Utils.mkSet("first", "second", metadata.config.name()),
@@ -76,7 +76,7 @@ public class CopartitionedTopicsValidatorTest {
@Test
- public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() throws Exception {
+ public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() {
final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1);
final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", 15);
final StreamPartitionAssignor.InternalTopicMetadata three = createTopicMetadata("three", 5);
@@ -99,7 +99,7 @@ public class CopartitionedTopicsValidatorTest {
}
@Test
- public void shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable() throws Exception {
+ public void shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable() {
final StreamPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1);
final StreamPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", StreamPartitionAssignor.NOT_AVAILABLE);
final Map<String, StreamPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 98ef8f6..e530d60 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -105,13 +105,13 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldLockGlobalStateDirectory() throws Exception {
+ public void shouldLockGlobalStateDirectory() {
stateManager.initialize(context);
assertTrue(new File(stateDirectory.globalStateDir(), ".lock").exists());
}
@Test(expected = LockException.class)
- public void shouldThrowLockExceptionIfCantGetLock() throws Exception {
+ public void shouldThrowLockExceptionIfCantGetLock() throws IOException {
final StateDirectory stateDir = new StateDirectory("appId", stateDirPath, time);
try {
stateDir.lockGlobalState(1);
@@ -122,7 +122,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldReadCheckpointOffsets() throws Exception {
+ public void shouldReadCheckpointOffsets() throws IOException {
final Map<TopicPartition, Long> expected = writeCheckpoint();
stateManager.initialize(context);
@@ -131,33 +131,33 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldNotDeleteCheckpointFileAfterLoaded() throws Exception {
+ public void shouldNotDeleteCheckpointFileAfterLoaded() throws IOException {
writeCheckpoint();
stateManager.initialize(context);
assertTrue(checkpointFile.exists());
}
@Test(expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws Exception {
+ public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws IOException {
writeCorruptCheckpoint();
stateManager.initialize(context);
}
@Test
- public void shouldInitializeStateStores() throws Exception {
+ public void shouldInitializeStateStores() {
stateManager.initialize(context);
assertTrue(store1.initialized);
assertTrue(store2.initialized);
}
@Test
- public void shouldReturnInitializedStoreNames() throws Exception {
+ public void shouldReturnInitializedStoreNames() {
final Set<String> storeNames = stateManager.initialize(context);
assertEquals(Utils.mkSet(store1.name(), store2.name()), storeNames);
}
@Test
- public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() throws Exception {
+ public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() {
stateManager.initialize(context);
try {
@@ -169,7 +169,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() throws Exception {
+ public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
stateManager.initialize(context);
initializeConsumer(2, 1, t1);
stateManager.register(store1, false, new TheStateRestoreCallback());
@@ -182,7 +182,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() throws Exception {
+ public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() {
stateManager.initialize(context);
try {
stateManager.register(store1, false, new TheStateRestoreCallback());
@@ -193,7 +193,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldRestoreRecordsUpToHighwatermark() throws Exception {
+ public void shouldRestoreRecordsUpToHighwatermark() {
initializeConsumer(2, 1, t1);
stateManager.initialize(context);
@@ -204,7 +204,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws Exception {
+ public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws IOException {
initializeConsumer(5, 6, t1);
final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
@@ -219,7 +219,7 @@ public class GlobalStateManagerImplTest {
@Test
- public void shouldFlushStateStores() throws Exception {
+ public void shouldFlushStateStores() {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
// register the stores
@@ -234,7 +234,7 @@ public class GlobalStateManagerImplTest {
}
@Test(expected = ProcessorStateException.class)
- public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() throws Exception {
+ public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
// register the stores
@@ -250,7 +250,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldCloseStateStores() throws Exception {
+ public void shouldCloseStateStores() throws IOException {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
// register the stores
@@ -265,7 +265,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldWriteCheckpointsOnClose() throws Exception {
+ public void shouldWriteCheckpointsOnClose() throws IOException {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
initializeConsumer(1, 1, t1);
@@ -277,7 +277,7 @@ public class GlobalStateManagerImplTest {
}
@Test(expected = ProcessorStateException.class)
- public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws Exception {
+ public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException {
stateManager.initialize(context);
initializeConsumer(1, 1, t1);
stateManager.register(new NoOpReadOnlyStore(store1.name()) {
@@ -291,7 +291,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() throws Exception {
+ public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
stateManager.initialize(context);
try {
stateManager.register(store1, false, null);
@@ -302,7 +302,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldUnlockGlobalStateDirectoryOnClose() throws Exception {
+ public void shouldUnlockGlobalStateDirectoryOnClose() throws IOException {
stateManager.initialize(context);
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
final StateDirectory stateDir = new StateDirectory("appId", stateDirPath, new MockTime());
@@ -315,7 +315,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldNotCloseStoresIfCloseAlreadyCalled() throws Exception {
+ public void shouldNotCloseStoresIfCloseAlreadyCalled() throws IOException {
stateManager.initialize(context);
initializeConsumer(1, 1, t1);
stateManager.register(new NoOpReadOnlyStore("t1-store") {
@@ -334,7 +334,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws Exception {
+ public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws IOException {
stateManager.initialize(context);
initializeConsumer(1, 1, t1);
initializeConsumer(1, 1, t2);
@@ -359,7 +359,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exception {
+ public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws IOException {
writeCorruptCheckpoint();
try {
stateManager.initialize(context);
@@ -376,7 +376,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldCheckpointOffsets() throws Exception {
+ public void shouldCheckpointOffsets() throws IOException {
final Map<TopicPartition, Long> offsets = Collections.singletonMap(t1, 25L);
stateManager.initialize(context);
@@ -388,7 +388,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() throws Exception {
+ public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
initializeConsumer(10, 1, t1);
@@ -405,7 +405,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldSkipNullKeysWhenRestoring() throws Exception {
+ public void shouldSkipNullKeysWhenRestoring() {
final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
startOffsets.put(t1, 1L);
final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
@@ -446,7 +446,7 @@ public class GlobalStateManagerImplTest {
}
@Test
- public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() throws Exception {
+ public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
stateManager = new GlobalStateManagerImpl(topology, consumer, new StateDirectory("appId", stateDirPath, time) {
@Override
public boolean lockGlobalState(final int retry) throws IOException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 7859a06..4ece443 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.test.NoOpProcessorContext;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -95,20 +96,20 @@ public class GlobalStateTaskTest {
}
@Test
- public void shouldInitializeStateManager() throws Exception {
+ public void shouldInitializeStateManager() {
final Map<TopicPartition, Long> startingOffsets = globalStateTask.initialize();
assertTrue(stateMgr.initialized);
assertEquals(offsets, startingOffsets);
}
@Test
- public void shouldInitializeContext() throws Exception {
+ public void shouldInitializeContext() {
globalStateTask.initialize();
assertTrue(context.initialized);
}
@Test
- public void shouldInitializeProcessorTopology() throws Exception {
+ public void shouldInitializeProcessorTopology() {
globalStateTask.initialize();
for (ProcessorNode processorNode : processorNodes) {
if (processorNode instanceof MockProcessorNode) {
@@ -120,7 +121,7 @@ public class GlobalStateTaskTest {
}
@Test
- public void shouldProcessRecordsForTopic() throws Exception {
+ public void shouldProcessRecordsForTopic() {
globalStateTask.initialize();
globalStateTask.update(new ConsumerRecord<>("t1", 1, 1, "foo".getBytes(), "bar".getBytes()));
assertEquals(1, sourceOne.numReceived);
@@ -128,7 +129,7 @@ public class GlobalStateTaskTest {
}
@Test
- public void shouldProcessRecordsForOtherTopic() throws Exception {
+ public void shouldProcessRecordsForOtherTopic() {
final byte[] integerBytes = new IntegerSerializer().serialize("foo", 1);
globalStateTask.initialize();
globalStateTask.update(new ConsumerRecord<>("t2", 1, 1, integerBytes, integerBytes));
@@ -194,7 +195,7 @@ public class GlobalStateTaskTest {
@Test
- public void shouldCloseStateManagerWithOffsets() throws Exception {
+ public void shouldCloseStateManagerWithOffsets() throws IOException {
final Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
expectedOffsets.put(t1, 52L);
expectedOffsets.put(t2, 100L);
@@ -206,7 +207,7 @@ public class GlobalStateTaskTest {
}
@Test
- public void shouldCheckpointOffsetsWhenStateIsFlushed() throws Exception {
+ public void shouldCheckpointOffsetsWhenStateIsFlushed() {
final Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
expectedOffsets.put(t1, 102L);
expectedOffsets.put(t2, 100L);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 9e1526b..9d0b637 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -69,7 +69,7 @@ public class GlobalStreamThreadTest {
}
@Test
- public void shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() throws Exception {
+ public void shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() {
// should throw as the MockConsumer hasn't been configured and there are no
// partitions available
try {
@@ -83,7 +83,7 @@ public class GlobalStreamThreadTest {
@SuppressWarnings("unchecked")
@Test
- public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() throws Exception {
+ public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() {
final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
@Override
public List<PartitionInfo> partitionsFor(final String topic) {
@@ -110,14 +110,14 @@ public class GlobalStreamThreadTest {
@Test
- public void shouldBeRunningAfterSuccessfulStart() throws Exception {
+ public void shouldBeRunningAfterSuccessfulStart() {
initializeConsumer();
globalStreamThread.start();
assertTrue(globalStreamThread.stillRunning());
}
@Test(timeout = 30000)
- public void shouldStopRunningWhenClosedByUser() throws Exception {
+ public void shouldStopRunningWhenClosedByUser() throws InterruptedException {
initializeConsumer();
globalStreamThread.start();
globalStreamThread.shutdown();
@@ -126,7 +126,7 @@ public class GlobalStreamThreadTest {
}
@Test
- public void shouldCloseStateStoresOnClose() throws Exception {
+ public void shouldCloseStateStoresOnClose() throws InterruptedException {
initializeConsumer();
globalStreamThread.start();
final StateStore globalStore = builder.globalStateStores().get("bar");
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
index 55534b7..044e82a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
@@ -33,7 +33,7 @@ import static org.junit.Assert.assertTrue;
public class InternalTopicConfigTest {
@Test
- public void shouldHaveCompactionPropSetIfSupplied() throws Exception {
+ public void shouldHaveCompactionPropSetIfSupplied() {
final Properties properties = new InternalTopicConfig("name",
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
Collections.<String, String>emptyMap()).toProperties(0);
@@ -42,17 +42,17 @@ public class InternalTopicConfigTest {
@Test(expected = NullPointerException.class)
- public void shouldThrowIfNameIsNull() throws Exception {
+ public void shouldThrowIfNameIsNull() {
new InternalTopicConfig(null, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.<String, String>emptyMap());
}
@Test(expected = InvalidTopicException.class)
- public void shouldThrowIfNameIsInvalid() throws Exception {
+ public void shouldThrowIfNameIsInvalid() {
new InternalTopicConfig("foo bar baz", Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), Collections.<String, String>emptyMap());
}
@Test
- public void shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete() throws Exception {
+ public void shouldConfigureRetentionMsWithAdditionalRetentionWhenCompactAndDelete() {
final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
Utils.mkSet(InternalTopicConfig.CleanupPolicy.compact, InternalTopicConfig.CleanupPolicy.delete),
Collections.<String, String>emptyMap());
@@ -63,7 +63,7 @@ public class InternalTopicConfigTest {
}
@Test
- public void shouldNotConfigureRetentionMsWhenCompact() throws Exception {
+ public void shouldNotConfigureRetentionMsWhenCompact() {
final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
Collections.<String, String>emptyMap());
@@ -73,7 +73,7 @@ public class InternalTopicConfigTest {
}
@Test
- public void shouldNotConfigureRetentionMsWhenDelete() throws Exception {
+ public void shouldNotConfigureRetentionMsWhenDelete() {
final InternalTopicConfig topicConfig = new InternalTopicConfig("name",
Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
Collections.<String, String>emptyMap());
@@ -84,7 +84,7 @@ public class InternalTopicConfigTest {
@Test
- public void shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete() throws Exception {
+ public void shouldBeCompactedIfCleanupPolicyCompactOrCompactAndDelete() {
assertTrue(new InternalTopicConfig("name",
Collections.singleton(InternalTopicConfig.CleanupPolicy.compact),
Collections.<String, String>emptyMap()).isCompacted());
@@ -94,14 +94,14 @@ public class InternalTopicConfigTest {
}
@Test
- public void shouldNotBeCompactedWhenCleanupPolicyIsDelete() throws Exception {
+ public void shouldNotBeCompactedWhenCleanupPolicyIsDelete() {
assertFalse(new InternalTopicConfig("name",
Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
Collections.<String, String>emptyMap()).isCompacted());
}
@Test
- public void shouldUseCleanupPolicyFromConfigIfSupplied() throws Exception {
+ public void shouldUseCleanupPolicyFromConfigIfSupplied() {
final InternalTopicConfig config = new InternalTopicConfig("name",
Collections.singleton(InternalTopicConfig.CleanupPolicy.delete),
Collections.singletonMap("cleanup.policy", "compact"));
@@ -111,7 +111,7 @@ public class InternalTopicConfigTest {
}
@Test
- public void shouldHavePropertiesSuppliedByUser() throws Exception {
+ public void shouldHavePropertiesSuppliedByUser() {
final Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "1000");
configs.put("retention.bytes", "10000");
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
index 469f9cb..1cd6bee 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
@@ -56,21 +56,21 @@ public class InternalTopicManagerTest {
}
@Test
- public void shouldReturnCorrectPartitionCounts() throws Exception {
+ public void shouldReturnCorrectPartitionCounts() {
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
Assert.assertEquals(Collections.singletonMap(topic, 1), internalTopicManager.getNumPartitions(Collections.singleton(topic)));
}
@Test
- public void shouldCreateRequiredTopics() throws Exception {
+ public void shouldCreateRequiredTopics() {
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
internalTopicManager.makeReady(Collections.singletonMap(new InternalTopicConfig(topic, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), null), 1));
}
@Test
- public void shouldNotCreateTopicIfExistsWithDifferentPartitions() throws Exception {
+ public void shouldNotCreateTopicIfExistsWithDifferentPartitions() {
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
boolean exceptionWasThrown = false;
@@ -83,7 +83,7 @@ public class InternalTopicManagerTest {
}
@Test
- public void shouldNotThrowExceptionIfExistsWithDifferentReplication() throws Exception {
+ public void shouldNotThrowExceptionIfExistsWithDifferentReplication() {
// create topic the first time with replication 2
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 2,
@@ -101,7 +101,7 @@ public class InternalTopicManagerTest {
}
@Test
- public void shouldNotThrowExceptionForEmptyTopicMap() throws Exception {
+ public void shouldNotThrowExceptionForEmptyTopicMap() {
InternalTopicManager internalTopicManager = new InternalTopicManager(streamsKafkaClient, 1,
WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, time);
internalTopicManager.makeReady(Collections.EMPTY_MAP);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
index f6a1518..24653e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
@@ -26,18 +26,18 @@ public class MinTimestampTrackerTest {
private MinTimestampTracker<String> tracker = new MinTimestampTracker<>();
@Test
- public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() throws Exception {
+ public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() {
assertThat(tracker.get(), equalTo(TimestampTracker.NOT_KNOWN));
}
@Test
- public void shouldReturnTimestampOfOnlyRecord() throws Exception {
+ public void shouldReturnTimestampOfOnlyRecord() {
tracker.addElement(elem(100));
assertThat(tracker.get(), equalTo(100L));
}
@Test
- public void shouldReturnLowestAvailableTimestampFromAllInputs() throws Exception {
+ public void shouldReturnLowestAvailableTimestampFromAllInputs() {
tracker.addElement(elem(100));
tracker.addElement(elem(99));
tracker.addElement(elem(102));
@@ -45,7 +45,7 @@ public class MinTimestampTrackerTest {
}
@Test
- public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() throws Exception {
+ public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() {
final Stamped<String> lowest = elem(88);
tracker.addElement(lowest);
tracker.addElement(elem(101));
@@ -55,7 +55,7 @@ public class MinTimestampTrackerTest {
}
@Test
- public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() throws Exception {
+ public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() {
final Stamped<String> record = elem(98);
tracker.addElement(record);
tracker.removeElement(record);
@@ -63,12 +63,12 @@ public class MinTimestampTrackerTest {
}
@Test
- public void shouldIgnoreNullRecordOnRemove() throws Exception {
+ public void shouldIgnoreNullRecordOnRemove() {
tracker.removeElement(null);
}
@Test(expected = NullPointerException.class)
- public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() throws Exception {
+ public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() {
tracker.addElement(null);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index f37a674..c56f609 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -35,14 +35,14 @@ public class ProcessorNodeTest {
@SuppressWarnings("unchecked")
@Test (expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() throws Exception {
+ public void shouldThrowStreamsExceptionIfExceptionCaughtDuringInit() {
final ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
node.init(null);
}
@SuppressWarnings("unchecked")
@Test (expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() throws Exception {
+ public void shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
final ProcessorNode node = new ProcessorNode("name", new ExceptionalProcessor(), Collections.emptySet());
node.close();
}