You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/11 08:42:22 UTC
[5/5] kafka git commit: KAFKA-5531;
throw concrete exceptions in streams tests
KAFKA-5531; throw concrete exceptions in streams tests
1. Now instead of just generic `Exception` methods declare more concrete
exceptions throwing or don't declare any throwing at all, if not needed.
2. `SimpleBenchmark.run()` throws `RuntimeException`
3. `SimpleBenchmark.produce()` throws `IllegalArgumentException`
4. Expect `ProcessorStateException` in
`StandbyTaskTest.testUpdateNonPersistentStore()`
/cc enothereska
Author: Evgeny Veretennikov <ev...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>
Closes #3485 from evis/5531-throw-concrete-exceptions
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c5464edb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c5464edb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c5464edb
Branch: refs/heads/trunk
Commit: c5464edbb7a6821e0a91a3712b1fe2fd92a22d68
Parents: 3728f4c
Author: Evgeny Veretennikov <ev...@gmail.com>
Authored: Mon Sep 11 09:42:10 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Mon Sep 11 09:42:10 2017 +0100
----------------------------------------------------------------------
.../apache/kafka/streams/KafkaStreamsTest.java | 24 ++---
.../apache/kafka/streams/StreamsConfigTest.java | 36 ++++----
.../streams/integration/EosIntegrationTest.java | 4 +-
.../integration/FanoutIntegrationTest.java | 2 +-
.../GlobalKTableIntegrationTest.java | 6 +-
.../InternalTopicIntegrationTest.java | 4 +-
.../integration/JoinIntegrationTest.java | 8 +-
.../KStreamAggregationDedupIntegrationTest.java | 4 +-
.../KStreamAggregationIntegrationTest.java | 4 +-
.../KStreamKTableJoinIntegrationTest.java | 2 +-
.../integration/KStreamRepartitionJoinTest.java | 24 ++---
...eamsFineGrainedAutoResetIntegrationTest.java | 7 +-
.../KTableKTableJoinIntegrationTest.java | 43 ++++-----
.../QueryableStateIntegrationTest.java | 12 +--
.../integration/RegexSourceIntegrationTest.java | 7 +-
.../integration/ResetIntegrationTest.java | 3 +-
.../integration/utils/EmbeddedKafkaCluster.java | 16 ++--
.../streams/kstream/KStreamBuilderTest.java | 32 +++----
.../internals/GlobalKTableJoinsTest.java | 2 +-
.../internals/KGroupedStreamImplTest.java | 96 ++++++++++----------
.../internals/KGroupedTableImplTest.java | 26 +++---
.../kstream/internals/KStreamImplTest.java | 58 ++++++------
.../internals/KStreamKStreamJoinTest.java | 10 +-
.../internals/KStreamKStreamLeftJoinTest.java | 4 +-
.../internals/KStreamKTableJoinTest.java | 2 +-
.../internals/KStreamKTableLeftJoinTest.java | 2 +-
...reamSessionWindowAggregateProcessorTest.java | 18 ++--
.../internals/KStreamWindowAggregateTest.java | 6 +-
.../internals/KTableKTableLeftJoinTest.java | 8 +-
.../internals/KTableKTableOuterJoinTest.java | 6 +-
.../kstream/internals/SessionKeySerdeTest.java | 22 ++---
.../kafka/streams/perf/SimpleBenchmark.java | 36 ++++----
.../kafka/streams/perf/YahooBenchmark.java | 6 +-
.../streams/processor/TopologyBuilderTest.java | 43 ++++-----
.../internals/AbstractProcessorContextTest.java | 22 ++---
.../processor/internals/AbstractTaskTest.java | 6 +-
.../CopartitionedTopicsValidatorTest.java | 10 +-
.../internals/GlobalStateManagerImplTest.java | 52 +++++------
.../internals/GlobalStateTaskTest.java | 15 +--
.../internals/GlobalStreamThreadTest.java | 10 +-
.../internals/InternalTopicConfigTest.java | 20 ++--
.../internals/InternalTopicManagerTest.java | 10 +-
.../internals/MinTimestampTrackerTest.java | 14 +--
.../processor/internals/ProcessorNodeTest.java | 4 +-
.../internals/ProcessorStateManagerTest.java | 20 ++--
.../internals/ProcessorTopologyTest.java | 10 +-
.../internals/RecordCollectorTest.java | 10 +-
.../processor/internals/RecordQueueTest.java | 4 +-
.../SourceNodeRecordDeserializerTest.java | 6 +-
.../processor/internals/StandbyTaskTest.java | 16 ++--
.../processor/internals/StateConsumerTest.java | 21 +++--
.../processor/internals/StateDirectoryTest.java | 30 +++---
.../processor/internals/StateRestorerTest.java | 16 ++--
.../internals/StoreChangelogReaderTest.java | 24 ++---
.../internals/StreamPartitionAssignorTest.java | 8 +-
.../processor/internals/StreamTaskTest.java | 68 +++++++-------
.../processor/internals/StreamThreadTest.java | 12 +--
.../internals/StreamsMetadataStateTest.java | 40 ++++----
.../internals/StreamsMetricsImplTest.java | 2 +-
.../assignment/AssignmentInfoTest.java | 2 +-
.../internals/assignment/ClientStateTest.java | 34 +++----
.../assignment/StickyTaskAssignorTest.java | 56 ++++++------
.../assignment/SubscriptionInfoTest.java | 4 +-
.../apache/kafka/streams/state/StoresTest.java | 10 +-
.../internals/AbstractKeyValueStoreTest.java | 20 ++--
.../internals/CachingKeyValueStoreTest.java | 38 ++++----
.../internals/CachingSessionStoreTest.java | 48 +++++-----
.../state/internals/CachingWindowStoreTest.java | 44 ++++-----
.../ChangeLoggingKeyValueBytesStoreTest.java | 30 +++---
.../ChangeLoggingKeyValueStoreTest.java | 38 ++++----
.../CompositeReadOnlyKeyValueStoreTest.java | 32 +++----
.../CompositeReadOnlySessionStoreTest.java | 10 +-
.../CompositeReadOnlyWindowStoreTest.java | 18 ++--
.../DelegatingPeekingKeyValueIteratorTest.java | 12 +--
.../internals/FilteredCacheIteratorTest.java | 12 +--
.../internals/GlobalStateStoreProviderTest.java | 6 +-
...tedCacheWrappedSessionStoreIteratorTest.java | 14 +--
...rtedCacheWrappedWindowStoreIteratorTest.java | 6 +-
...eWrappedWindowStoreKeyValueIteratorTest.java | 14 +--
.../streams/state/internals/NamedCacheTest.java | 36 ++++----
.../internals/QueryableStoreProviderTest.java | 14 +--
.../RocksDBKeyValueStoreSupplierTest.java | 14 +--
.../internals/RocksDBKeyValueStoreTest.java | 8 +-
.../RocksDBSegmentedBytesStoreTest.java | 8 +-
.../RocksDBSessionStoreSupplierTest.java | 14 +--
.../internals/RocksDBSessionStoreTest.java | 28 +++---
.../state/internals/RocksDBStoreTest.java | 6 +-
.../RocksDBWindowStoreSupplierTest.java | 14 +--
.../state/internals/RocksDBWindowStoreTest.java | 18 ++--
.../state/internals/SegmentIteratorTest.java | 8 +-
.../internals/SegmentedCacheFunctionTest.java | 8 +-
.../streams/state/internals/SegmentsTest.java | 22 ++---
.../SerializedKeyValueIteratorTest.java | 10 +-
.../state/internals/SessionKeySchemaTest.java | 14 +--
.../state/internals/StoreChangeLoggerTest.java | 2 +-
.../StreamThreadStateStoreProviderTest.java | 14 +--
.../state/internals/ThreadCacheTest.java | 46 +++++-----
.../state/internals/WindowKeySchemaTest.java | 14 +--
.../state/internals/WindowStoreUtilsTest.java | 2 +-
.../internals/WrappingStoreProviderTest.java | 6 +-
.../streams/tests/BrokerCompatibilityTest.java | 2 +-
.../kafka/streams/tests/EosTestDriver.java | 2 +-
.../kafka/streams/tests/SmokeTestDriver.java | 4 +-
.../kafka/streams/tests/SmokeTestUtil.java | 2 +-
.../kafka/streams/tests/StreamsEosTest.java | 2 +-
.../kafka/streams/tests/StreamsSmokeTest.java | 2 +-
106 files changed, 900 insertions(+), 901 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 5e5f6c9..994a3a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -83,7 +83,7 @@ public class KafkaStreamsTest {
}
@Test
- public void testStateChanges() throws Exception {
+ public void testStateChanges() throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder();
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
@@ -104,7 +104,7 @@ public class KafkaStreamsTest {
}
@Test
- public void testStateCloseAfterCreate() throws Exception {
+ public void testStateCloseAfterCreate() {
final StreamsBuilder builder = new StreamsBuilder();
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
@@ -208,7 +208,7 @@ public class KafkaStreamsTest {
@Test
- public void testInitializesAndDestroysMetricsReporters() throws Exception {
+ public void testInitializesAndDestroysMetricsReporters() {
final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
final StreamsBuilder builder = new StreamsBuilder();
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
@@ -223,7 +223,7 @@ public class KafkaStreamsTest {
}
@Test
- public void testCloseIsIdempotent() throws Exception {
+ public void testCloseIsIdempotent() {
streams.close();
final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
@@ -233,7 +233,7 @@ public class KafkaStreamsTest {
}
@Test
- public void testCannotStartOnceClosed() throws Exception {
+ public void testCannotStartOnceClosed() {
streams.start();
streams.close();
try {
@@ -247,7 +247,7 @@ public class KafkaStreamsTest {
}
@Test
- public void testCannotStartTwice() throws Exception {
+ public void testCannotStartTwice() {
streams.start();
try {
@@ -336,22 +336,22 @@ public class KafkaStreamsTest {
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
+ public void shouldNotGetAllTasksWhenNotRunning() {
streams.allMetadata();
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
+ public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
streams.allMetadataForStore("store");
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
+ public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
streams.metadataForKey("store", "key", Serdes.String().serializer());
}
@Test(expected = IllegalStateException.class)
- public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
+ public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
@Override
public Integer partition(final String key, final Object value, final int numPartitions) {
@@ -438,7 +438,7 @@ public class KafkaStreamsTest {
}
@Test
- public void testCleanup() throws Exception {
+ public void testCleanup() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -453,7 +453,7 @@ public class KafkaStreamsTest {
}
@Test
- public void testCannotCleanupWhileRunning() throws Exception {
+ public void testCannotCleanupWhileRunning() throws InterruptedException {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 69a44a6..3774a8e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -82,7 +82,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetProducerConfigs() throws Exception {
+ public void testGetProducerConfigs() {
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer");
@@ -91,7 +91,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetConsumerConfigs() throws Exception {
+ public void testGetConsumerConfigs() {
final String groupId = "example-application";
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, groupId, clientId);
@@ -102,7 +102,7 @@ public class StreamsConfigTest {
}
@Test
- public void testGetRestoreConsumerConfigs() throws Exception {
+ public void testGetRestoreConsumerConfigs() {
final String clientId = "client";
final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer");
@@ -143,7 +143,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+ public void shouldSupportPrefixedConsumerConfigs() {
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -153,7 +153,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
+ public void shouldSupportPrefixedRestoreConsumerConfigs() {
props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -163,7 +163,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws Exception {
+ public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
@@ -171,7 +171,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws Exception {
+ public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(consumerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
@@ -179,7 +179,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws Exception {
+ public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
props.put(producerPrefix("interceptor.statsd.host"), "host");
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
@@ -188,7 +188,7 @@ public class StreamsConfigTest {
@Test
- public void shouldSupportPrefixedProducerConfigs() throws Exception {
+ public void shouldSupportPrefixedProducerConfigs() {
props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -198,7 +198,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+ public void shouldBeSupportNonPrefixedConsumerConfigs() {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -208,7 +208,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception {
+ public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -218,7 +218,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+ public void shouldSupportNonPrefixedProducerConfigs() {
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -230,21 +230,21 @@ public class StreamsConfigTest {
@Test(expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception {
+ public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.defaultKeySerde();
}
@Test(expected = StreamsException.class)
- public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception {
+ public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
final StreamsConfig streamsConfig = new StreamsConfig(props);
streamsConfig.defaultValueSerde();
}
@Test
- public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+ public void shouldOverrideStreamsDefaultConsumerConfigs() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -254,7 +254,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+ public void shouldOverrideStreamsDefaultProducerConfigs() {
props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
@@ -262,7 +262,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+ public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -288,7 +288,7 @@ public class StreamsConfigTest {
}
@Test
- public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() throws Exception {
+ public void shouldSetInternalLeaveGroupOnCloseConfigToFalseInConsumer() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
assertThat(consumerConfigs.get("internal.leave.group.on.close"), CoreMatchers.<Object>equalTo(false));
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index fb93783..e50f4d0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -97,7 +97,7 @@ public class EosIntegrationTest {
private int testNumber = 0;
@Before
- public void createTopics() throws Exception {
+ public void createTopics() throws InterruptedException {
applicationId = "appId-" + ++testNumber;
CLUSTER.deleteTopicsAndWait(
SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
@@ -701,7 +701,7 @@ public class EosIntegrationTest {
}
private List<KeyValue<Long, Long>> readResult(final int numberOfRecords,
- final String groupId) throws Exception {
+ final String groupId) throws InterruptedException {
if (groupId != null) {
return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
TestUtils.consumerConfig(
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 733ca0b..b4a9320 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -78,7 +78,7 @@ public class FanoutIntegrationTest {
private static final String OUTPUT_TOPIC_C = "C";
@BeforeClass
- public static void startKafkaCluster() throws Exception {
+ public static void startKafkaCluster() throws InterruptedException {
CLUSTER.createTopics(INPUT_TOPIC_A, OUTPUT_TOPIC_B, OUTPUT_TOPIC_C);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 7aef5c9..cbf2b56 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -232,7 +232,7 @@ public class GlobalKTableIntegrationTest {
kafkaStreams.start();
}
- private void produceTopicValues(final String topic) throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void produceTopicValues(final String topic) throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronously(
topic,
Arrays.asList(
@@ -249,7 +249,7 @@ public class GlobalKTableIntegrationTest {
mockTime);
}
- private void produceInitialGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void produceInitialGlobalTableValues() throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronously(
globalOne,
Arrays.asList(
@@ -265,7 +265,7 @@ public class GlobalKTableIntegrationTest {
mockTime);
}
- private void produceGlobalTableValues() throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void produceGlobalTableValues() throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronously(
globalOne,
Arrays.asList(
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 587f478..01cfa5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -76,7 +76,7 @@ public class InternalTopicIntegrationTest {
private String applicationId = "compact-topics-integration-test";
@BeforeClass
- public static void startKafkaCluster() throws Exception {
+ public static void startKafkaCluster() throws InterruptedException {
CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_OUTPUT_TOPIC);
}
@@ -173,7 +173,7 @@ public class InternalTopicIntegrationTest {
assertEquals(LogConfig.Compact(), properties.getProperty(LogConfig.CleanupPolicyProp()));
}
- private void produceData(final List<String> inputValues) throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void produceData(final List<String> inputValues) throws Exception {
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 490c928..3a771c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -100,7 +100,7 @@ public class JoinIntegrationTest {
};
@BeforeClass
- public static void setupConfigsAndUtils() throws Exception {
+ public static void setupConfigsAndUtils() {
PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
@@ -124,7 +124,7 @@ public class JoinIntegrationTest {
}
@Before
- public void prepareTopology() throws Exception {
+ public void prepareTopology() throws InterruptedException {
CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
builder = new StreamsBuilder();
@@ -135,11 +135,11 @@ public class JoinIntegrationTest {
}
@After
- public void cleanup() throws Exception {
+ public void cleanup() throws InterruptedException {
CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
}
- private void checkResult(final String outputTopic, final List<String> expectedResult) throws Exception {
+ private void checkResult(final String outputTopic, final List<String> expectedResult) throws InterruptedException {
if (expectedResult != null) {
final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L);
assertThat(result, is(expectedResult));
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 5740779..cb58849 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -53,7 +53,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.ExecutionException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -260,8 +259,7 @@ public class KStreamAggregationDedupIntegrationTest {
}
- private void produceMessages(long timestamp)
- throws ExecutionException, InterruptedException {
+ private void produceMessages(long timestamp) throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
streamOneInput,
Arrays.asList(
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index e9927bc..7ff24da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -66,7 +66,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -645,8 +644,7 @@ public class KStreamAggregationIntegrationTest {
}
- private void produceMessages(final long timestamp)
- throws ExecutionException, InterruptedException {
+ private void produceMessages(final long timestamp) throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
streamOneInput,
Arrays.asList(
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index 8dc22ce..a433667 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -144,7 +144,7 @@ public class KStreamKTableJoinIntegrationTest {
countClicksPerRegion(10 * 1024 * 1024);
}
- private void countClicksPerRegion(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void countClicksPerRegion(final int cacheSizeBytes) throws Exception {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
// Input 1: Clicks per user (multiple records allowed per user).
final List<KeyValue<String, Long>> userClicks = Arrays.asList(
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 9618033..5f6ff44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -53,7 +53,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -156,7 +155,7 @@ public class KStreamRepartitionJoinTest {
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput);
}
- private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws Exception {
+ private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws InterruptedException {
final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
@@ -164,7 +163,7 @@ public class KStreamRepartitionJoinTest {
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join-" + testNo);
}
- private ExpectedOutputOnTopic mapMapJoin() throws Exception {
+ private ExpectedOutputOnTopic mapMapJoin() throws InterruptedException {
final KStream<Integer, Integer> mapMapStream = streamOne.map(
new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() {
@Override
@@ -181,7 +180,7 @@ public class KStreamRepartitionJoinTest {
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
}
- private ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException {
+ private ExpectedOutputOnTopic selectKeyAndJoin() throws Exception {
final KStream<Integer, Integer> keySelected =
streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper());
@@ -191,7 +190,7 @@ public class KStreamRepartitionJoinTest {
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
}
- private ExpectedOutputOnTopic flatMapJoin() throws Exception {
+ private ExpectedOutputOnTopic flatMapJoin() throws InterruptedException {
final KStream<Integer, Integer> flatMapped = streamOne.flatMap(
new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>() {
@Override
@@ -206,7 +205,7 @@ public class KStreamRepartitionJoinTest {
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
}
- private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception {
+ private ExpectedOutputOnTopic joinMappedRhsStream() throws InterruptedException {
final String output = "join-rhs-stream-mapped-" + testNo;
CLUSTER.createTopic(output);
@@ -220,7 +219,7 @@ public class KStreamRepartitionJoinTest {
return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output);
}
- private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception {
+ private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws InterruptedException {
final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
@@ -244,7 +243,7 @@ public class KStreamRepartitionJoinTest {
return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
}
- private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception {
+ private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws InterruptedException {
final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
@@ -293,16 +292,14 @@ public class KStreamRepartitionJoinTest {
is(expectedOutputOnTopic.expectedOutput));
}
- private void produceMessages()
- throws ExecutionException, InterruptedException {
+ private void produceMessages() throws Exception {
produceToStreamOne();
produceStreamTwoInputTo(streamTwoInput);
produceStreamTwoInputTo(streamFourInput);
}
- private void produceStreamTwoInputTo(final String streamTwoInput)
- throws ExecutionException, InterruptedException {
+ private void produceStreamTwoInputTo(final String streamTwoInput) throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronously(
streamTwoInput,
Arrays.asList(
@@ -319,8 +316,7 @@ public class KStreamRepartitionJoinTest {
mockTime);
}
- private void produceToStreamOne()
- throws ExecutionException, InterruptedException {
+ private void produceToStreamOne() throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronously(
streamOneInput,
Arrays.asList(
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index 92f351b..5415b58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -47,6 +47,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -107,7 +108,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
@BeforeClass
- public static void startKafkaCluster() throws Exception {
+ public static void startKafkaCluster() throws InterruptedException {
CLUSTER.createTopics(
TOPIC_1_0,
TOPIC_2_0,
@@ -135,7 +136,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
}
@Before
- public void setUp() throws Exception {
+ public void setUp() throws IOException {
Properties props = new Properties();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -274,7 +275,7 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
}
@Test
- public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception {
+ public void shouldThrowStreamsExceptionNoResetSpecified() throws InterruptedException {
Properties props = new Properties();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 9df1ef5..949f8be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -42,6 +42,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -118,12 +119,12 @@ public class KTableKTableJoinIntegrationTest {
}
@Before
- public void before() throws Exception {
+ public void before() throws IOException {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
}
@After
- public void after() throws Exception {
+ public void after() throws IOException {
if (streams != null) {
streams.close();
streams = null;
@@ -137,27 +138,27 @@ public class KTableKTableJoinIntegrationTest {
@Test
- public void shouldInnerInnerJoin() throws Exception {
+ public void shouldInnerInnerJoin() throws InterruptedException {
verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
}
@Test
- public void shouldInnerInnerJoinQueryable() throws Exception {
+ public void shouldInnerInnerJoinQueryable() throws InterruptedException {
verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
}
@Test
- public void shouldInnerLeftJoin() throws Exception {
+ public void shouldInnerLeftJoin() throws InterruptedException {
verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
}
@Test
- public void shouldInnerLeftJoinQueryable() throws Exception {
+ public void shouldInnerLeftJoinQueryable() throws InterruptedException {
verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
}
@Test
- public void shouldInnerOuterJoin() throws Exception {
+ public void shouldInnerOuterJoin() throws InterruptedException {
verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
@@ -166,7 +167,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldInnerOuterJoinQueryable() throws Exception {
+ public void shouldInnerOuterJoinQueryable() throws InterruptedException {
verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
@@ -175,7 +176,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldLeftInnerJoin() throws Exception {
+ public void shouldLeftInnerJoin() throws InterruptedException {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
@@ -183,7 +184,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldLeftInnerJoinQueryable() throws Exception {
+ public void shouldLeftInnerJoinQueryable() throws InterruptedException {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
@@ -191,7 +192,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldLeftLeftJoin() throws Exception {
+ public void shouldLeftLeftJoin() throws InterruptedException {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
@@ -199,7 +200,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldLeftLeftJoinQueryable() throws Exception {
+ public void shouldLeftLeftJoinQueryable() throws InterruptedException {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
@@ -207,7 +208,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldLeftOuterJoin() throws Exception {
+ public void shouldLeftOuterJoin() throws InterruptedException {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
@@ -218,7 +219,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldLeftOuterJoinQueryable() throws Exception {
+ public void shouldLeftOuterJoinQueryable() throws InterruptedException {
verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
@@ -229,7 +230,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldOuterInnerJoin() throws Exception {
+ public void shouldOuterInnerJoin() throws InterruptedException {
verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
@@ -238,7 +239,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldOuterInnerJoinQueryable() throws Exception {
+ public void shouldOuterInnerJoinQueryable() throws InterruptedException {
verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
@@ -247,7 +248,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldOuterLeftJoin() throws Exception {
+ public void shouldOuterLeftJoin() throws InterruptedException {
verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
@@ -256,7 +257,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldOuterLeftJoinQueryable() throws Exception {
+ public void shouldOuterLeftJoinQueryable() throws InterruptedException {
verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT, Arrays.asList(
new KeyValue<>("a", "A1-null-A3"),
new KeyValue<>("b", "B1-null-B3"),
@@ -265,7 +266,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldOuterOuterJoin() throws Exception {
+ public void shouldOuterOuterJoin() throws InterruptedException {
verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
@@ -277,7 +278,7 @@ public class KTableKTableJoinIntegrationTest {
}
@Test
- public void shouldOuterOuterJoinQueryable() throws Exception {
+ public void shouldOuterOuterJoinQueryable() throws InterruptedException {
verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
new KeyValue<>("a", "null-A3"),
new KeyValue<>("b", "null-B3"),
@@ -292,7 +293,7 @@ public class KTableKTableJoinIntegrationTest {
private void verifyKTableKTableJoin(final JoinType joinType1,
final JoinType joinType2,
final List<KeyValue<String, String>> expectedResult,
- boolean verifyQueryableState) throws Exception {
+ boolean verifyQueryableState) throws InterruptedException {
final String queryableName = verifyQueryableState ? joinType1 + "-" + joinType2 + "-ktable-ktable-join-query" : null;
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join" + queryableName);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index dc59fb4..9c8244a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -127,7 +127,7 @@ public class QueryableStateIntegrationTest {
}
@Before
- public void before() throws IOException, InterruptedException {
+ public void before() throws Exception {
testNo++;
createTopics();
streamsConfiguration = new Properties();
@@ -255,7 +255,7 @@ public class QueryableStateIntegrationTest {
private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
final KafkaStreamsTest.StateListenerStub stateListenerStub,
- final Set<String> keys, final String storeName) throws Exception {
+ final Set<String> keys, final String storeName) throws InterruptedException {
for (final String key : keys) {
TestUtils.waitForCondition(new TestCondition() {
@Override
@@ -287,7 +287,7 @@ public class QueryableStateIntegrationTest {
private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
final KafkaStreamsTest.StateListenerStub stateListenerStub,
final Set<String> keys, final String storeName,
- final Long from, final Long to) throws Exception {
+ final Long from, final Long to) throws InterruptedException {
for (final String key : keys) {
TestUtils.waitForCondition(new TestCondition() {
@Override
@@ -317,7 +317,7 @@ public class QueryableStateIntegrationTest {
@Test
- public void queryOnRebalance() throws Exception {
+ public void queryOnRebalance() throws InterruptedException {
final int numThreads = STREAM_TWO_PARTITIONS;
final StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
final Thread[] streamThreads = new Thread[numThreads];
@@ -369,7 +369,7 @@ public class QueryableStateIntegrationTest {
}
@Test
- public void concurrentAccesses() throws Exception {
+ public void concurrentAccesses() throws InterruptedException {
final int numIterations = 500000;
@@ -590,7 +590,7 @@ public class QueryableStateIntegrationTest {
}
}
- private void verifyCanQueryState(final int cacheSizeBytes) throws java.util.concurrent.ExecutionException, InterruptedException {
+ private void verifyCanQueryState(final int cacheSizeBytes) throws Exception {
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
final StreamsBuilder builder = new StreamsBuilder();
final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 13d9f82..1da4c58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -92,7 +93,7 @@ public class RegexSourceIntegrationTest {
@BeforeClass
- public static void startKafkaCluster() throws Exception {
+ public static void startKafkaCluster() throws InterruptedException {
CLUSTER.createTopics(
TOPIC_1,
TOPIC_2,
@@ -119,7 +120,7 @@ public class RegexSourceIntegrationTest {
}
@After
- public void tearDown() throws Exception {
+ public void tearDown() throws IOException {
if (streams != null) {
streams.close();
}
@@ -229,7 +230,7 @@ public class RegexSourceIntegrationTest {
@SuppressWarnings("deprecation")
@Test
- public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
+ public void shouldAddStateStoreToRegexDefinedSource() throws InterruptedException {
final ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
final MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 99a524e..897028d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -50,6 +50,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -242,7 +243,7 @@ public class ResetIntegrationTest {
cleanGlobal(null);
}
- private Properties prepareTest(final int threads) throws Exception {
+ private Properties prepareTest(final int threads) throws IOException {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index e738bc6..6a873d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -215,7 +215,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
*
* @param topic the name of the topic
*/
- public void deleteTopic(final String topic) throws Exception {
+ public void deleteTopic(final String topic) throws InterruptedException {
deleteTopicsAndWait(-1L, topic);
}
@@ -224,7 +224,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
*
* @param topic the name of the topic
*/
- public void deleteTopicAndWait(final String topic) throws Exception {
+ public void deleteTopicAndWait(final String topic) throws InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic);
}
@@ -234,7 +234,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
* @param timeoutMs the max time to wait for the topic to be deleted (does not block if {@code <= 0})
* @param topic the name of the topic
*/
- public void deleteTopicAndWait(final long timeoutMs, final String topic) throws Exception {
+ public void deleteTopicAndWait(final long timeoutMs, final String topic) throws InterruptedException {
deleteTopicsAndWait(timeoutMs, topic);
}
@@ -243,7 +243,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
*
* @param topics the name of the topics
*/
- public void deleteTopics(final String... topics) throws Exception {
+ public void deleteTopics(final String... topics) throws InterruptedException {
deleteTopicsAndWait(-1, topics);
}
@@ -252,7 +252,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
*
* @param topics the name of the topics
*/
- public void deleteTopicsAndWait(final String... topics) throws Exception {
+ public void deleteTopicsAndWait(final String... topics) throws InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
}
@@ -262,7 +262,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
* @param timeoutMs the max time to wait for the topics to be deleted (does not block if {@code <= 0})
* @param topics the name of the topics
*/
- public void deleteTopicsAndWait(final long timeoutMs, final String... topics) throws Exception {
+ public void deleteTopicsAndWait(final long timeoutMs, final String... topics) throws InterruptedException {
for (final String topic : topics) {
try {
brokers[0].deleteTopic(topic);
@@ -274,12 +274,12 @@ public class EmbeddedKafkaCluster extends ExternalResource {
}
}
- public void deleteAndRecreateTopics(final String... topics) throws Exception {
+ public void deleteAndRecreateTopics(final String... topics) throws InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
createTopics(topics);
}
- public void deleteAndRecreateTopics(final long timeoutMs, final String... topics) throws Exception {
+ public void deleteAndRecreateTopics(final long timeoutMs, final String... topics) throws InterruptedException {
deleteTopicsAndWait(timeoutMs, topics);
createTopics(topics);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
index ab17c9e..6a6d2a4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java
@@ -159,7 +159,7 @@ public class KStreamBuilderTest {
}
@Test
- public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Exception {
+ public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() {
final String topic1 = "topic-1";
final String topic2 = "topic-2";
final String topic3 = "topic-3";
@@ -192,17 +192,17 @@ public class KStreamBuilderTest {
}
@Test(expected = TopologyBuilderException.class)
- public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
+ public void shouldThrowExceptionWhenNoTopicPresent() {
builder.stream();
}
@Test(expected = NullPointerException.class)
- public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
+ public void shouldThrowExceptionWhenTopicNamesAreNull() {
builder.stream(Serdes.String(), Serdes.String(), null, null);
}
@Test
- public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
+ public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() {
KTable table1 = builder.table("topic1", "table1");
KTable table2 = builder.table("topic2", (String) null);
@@ -221,7 +221,7 @@ public class KStreamBuilderTest {
}
@Test
- public void shouldBuildSimpleGlobalTableTopology() throws Exception {
+ public void shouldBuildSimpleGlobalTableTopology() {
builder.globalTable("table", "globalTable");
final ProcessorTopology topology = builder.buildGlobalStateTopology();
@@ -231,7 +231,7 @@ public class KStreamBuilderTest {
assertEquals("globalTable", stateStores.get(0).name());
}
- private void doBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+ private void doBuildGlobalTopologyWithAllGlobalTables() {
final ProcessorTopology topology = builder.buildGlobalStateTopology();
final List<StateStore> stateStores = topology.globalStateStores();
@@ -242,7 +242,7 @@ public class KStreamBuilderTest {
}
@Test
- public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
+ public void shouldBuildGlobalTopologyWithAllGlobalTables() {
builder.globalTable("table", "globalTable");
builder.globalTable("table2", "globalTable2");
@@ -250,7 +250,7 @@ public class KStreamBuilderTest {
}
@Test
- public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception {
+ public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() {
builder.globalTable("table");
builder.globalTable("table2");
@@ -258,7 +258,7 @@ public class KStreamBuilderTest {
}
@Test
- public void shouldAddGlobalTablesToEachGroup() throws Exception {
+ public void shouldAddGlobalTablesToEachGroup() {
final String one = "globalTable";
final String two = "globalTable2";
final GlobalKTable<String, String> globalTable = builder.globalTable("table", one);
@@ -294,7 +294,7 @@ public class KStreamBuilderTest {
}
@Test
- public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
+ public void shouldMapStateStoresToCorrectSourceTopics() {
final KStream<String, String> playEvents = builder.stream("events");
final KTable<String, String> table = builder.table("table-topic", "table-store");
@@ -395,14 +395,14 @@ public class KStreamBuilderTest {
}
@Test
- public void kStreamTimestampExtractorShouldBeNull() throws Exception {
+ public void kStreamTimestampExtractorShouldBeNull() {
builder.stream("topic");
final ProcessorTopology processorTopology = builder.build(null);
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
@Test
- public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() throws Exception {
+ public void shouldAddTimestampExtractorToStreamWithKeyValSerdePerSource() {
builder.stream(new MockTimestampExtractor(), null, null, "topic");
final ProcessorTopology processorTopology = builder.build(null);
for (final SourceNode sourceNode: processorTopology.sources()) {
@@ -411,28 +411,28 @@ public class KStreamBuilderTest {
}
@Test
- public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() throws Exception {
+ public void shouldAddTimestampExtractorToStreamWithOffsetResetPerSource() {
builder.stream(null, new MockTimestampExtractor(), null, null, "topic");
final ProcessorTopology processorTopology = builder.build(null);
assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
}
@Test
- public void shouldAddTimestampExtractorToTablePerSource() throws Exception {
+ public void shouldAddTimestampExtractorToTablePerSource() {
builder.table("topic", "store");
final ProcessorTopology processorTopology = builder.build(null);
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
@Test
- public void kTableTimestampExtractorShouldBeNull() throws Exception {
+ public void kTableTimestampExtractorShouldBeNull() {
builder.table("topic", "store");
final ProcessorTopology processorTopology = builder.build(null);
assertNull(processorTopology.source("topic").getTimestampExtractor());
}
@Test
- public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() throws Exception {
+ public void shouldAddTimestampExtractorToTableWithKeyValSerdePerSource() {
builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store");
final ProcessorTopology processorTopology = builder.build(null);
assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 87c7ece..0ee74b8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -52,7 +52,7 @@ public class GlobalKTableJoinsTest {
public final KStreamTestDriver driver = new KStreamTestDriver();
@Before
- public void setUp() throws Exception {
+ public void setUp() {
stateDir = TestUtils.tempDirectory();
global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store");
stream = builder.stream(streamTopic, Consumed.with(Serdes.String(), Serdes.String()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/c5464edb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 78f8dbd..bc65e09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -74,111 +74,111 @@ public class KGroupedStreamImplTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullReducerOnReduce() throws Exception {
+ public void shouldNotHaveNullReducerOnReduce() {
groupedStream.reduce(null, "store");
}
@Test
- public void shouldAllowNullStoreNameOnReduce() throws Exception {
+ public void shouldAllowNullStoreNameOnReduce() {
groupedStream.reduce(MockReducer.STRING_ADDER, (String) null);
}
@Test(expected = InvalidTopicException.class)
- public void shouldNotHaveInvalidStoreNameOnReduce() throws Exception {
+ public void shouldNotHaveInvalidStoreNameOnReduce() {
groupedStream.reduce(MockReducer.STRING_ADDER, INVALID_STORE_NAME);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullStoreSupplierOnReduce() throws Exception {
+ public void shouldNotHaveNullStoreSupplierOnReduce() {
groupedStream.reduce(MockReducer.STRING_ADDER, (StateStoreSupplier<KeyValueStore>) null);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullStoreSupplierOnCount() throws Exception {
+ public void shouldNotHaveNullStoreSupplierOnCount() {
groupedStream.count((StateStoreSupplier<KeyValueStore>) null);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullStoreSupplierOnWindowedCount() throws Exception {
+ public void shouldNotHaveNullStoreSupplierOnWindowedCount() {
groupedStream.count(TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullReducerWithWindowedReduce() throws Exception {
+ public void shouldNotHaveNullReducerWithWindowedReduce() {
groupedStream.reduce(null, TimeWindows.of(10), "store");
}
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullWindowsWithWindowedReduce() throws Exception {
+ public void shouldNotHaveNullWindowsWithWindowedReduce() {
groupedStream.reduce(MockReducer.STRING_ADDER, (Windows) null, "store");
}
@Test
- public void shouldAllowNullStoreNameWithWindowedReduce() throws Exception {
+ public void shouldAllowNullStoreNameWithWindowedReduce() {
groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), (String) null);
}
@Test(expected = InvalidTopicException.class)
- public void shouldNotHaveInvalidStoreNameWithWindowedReduce() throws Exception {
+ public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
groupedStream.reduce(MockReducer.STRING_ADDER, TimeWindows.of(10), INVALID_STORE_NAME);
}
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullInitializerOnAggregate() throws Exception {
+ public void shouldNotHaveNullInitializerOnAggregate() {
groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, Serdes.String(), "store");
}
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullAdderOnAggregate() throws Exception {
+ public void shouldNotHaveNullAdderOnAggregate() {
groupedStream.aggregate(MockInitializer.STRING_INIT, null, Serdes.String(), "store");
}
@Test
- public void shouldAllowNullStoreNameOnAggregate() throws Exception {
+ public void shouldAllowNullStoreNameOnAggregate() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), null);
}
@Test(expected = InvalidTopicException.class)
- public void shouldNotHaveInvalidStoreNameOnAggregate() throws Exception {
+ public void shouldNotHaveInvalidStoreNameOnAggregate() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Serdes.String(), INVALID_STORE_NAME);
}
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullInitializerOnWindowedAggregate() throws Exception {
+ public void shouldNotHaveNullInitializerOnWindowedAggregate() {
groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), "store");
}
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullAdderOnWindowedAggregate() throws Exception {
+ public void shouldNotHaveNullAdderOnWindowedAggregate() {
groupedStream.aggregate(MockInitializer.STRING_INIT, null, TimeWindows.of(10), Serdes.String(), "store");
}
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullWindowsOnWindowedAggregate() throws Exception {
+ public void shouldNotHaveNullWindowsOnWindowedAggregate() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, null, Serdes.String(), "store");
}
@Test
- public void shouldAllowNullStoreNameOnWindowedAggregate() throws Exception {
+ public void shouldAllowNullStoreNameOnWindowedAggregate() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), null);
}
@Test(expected = InvalidTopicException.class)
- public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() throws Exception {
+ public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), INVALID_STORE_NAME);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
- public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
+ public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
}
- private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) throws Exception {
+ private void doAggregateSessionWindows(final Map<Windowed<String>, Integer> results) {
driver.setUp(builder, TestUtils.tempDirectory());
driver.setTime(10);
driver.process(TOPIC, "1", "1");
@@ -199,7 +199,7 @@ public class KGroupedStreamImplTest {
}
@Test
- public void shouldAggregateSessionWindows() throws Exception {
+ public void shouldAggregateSessionWindows() {
final Map<Windowed<String>, Integer> results = new HashMap<>();
KTable table = groupedStream.aggregate(new Initializer<Integer>() {
@Override
@@ -229,7 +229,7 @@ public class KGroupedStreamImplTest {
}
@Test
- public void shouldAggregateSessionWindowsWithInternalStoreName() throws Exception {
+ public void shouldAggregateSessionWindowsWithInternalStoreName() {
final Map<Windowed<String>, Integer> results = new HashMap<>();
KTable table = groupedStream.aggregate(new Initializer<Integer>() {
@Override
@@ -258,7 +258,7 @@ public class KGroupedStreamImplTest {
assertNull(table.queryableStoreName());
}
- private void doCountSessionWindows(final Map<Windowed<String>, Long> results) throws Exception {
+ private void doCountSessionWindows(final Map<Windowed<String>, Long> results) {
driver.setUp(builder, TestUtils.tempDirectory());
driver.setTime(10);
driver.process(TOPIC, "1", "1");
@@ -279,7 +279,7 @@ public class KGroupedStreamImplTest {
}
@Test
- public void shouldCountSessionWindows() throws Exception {
+ public void shouldCountSessionWindows() {
final Map<Windowed<String>, Long> results = new HashMap<>();
KTable table = groupedStream.count(SessionWindows.with(30), "session-store");
table.toStream().foreach(new ForeachAction<Windowed<String>, Long>() {
@@ -293,7 +293,7 @@ public class KGroupedStreamImplTest {
}
@Test
- public void shouldCountSessionWindowsWithInternalStoreName() throws Exception {
+ public void shouldCountSessionWindowsWithInternalStoreName() {
final Map<Windowed<String>, Long> results = new HashMap<>();
KTable table = groupedStream.count(SessionWindows.with(30));
table.toStream().foreach(new ForeachAction<Windowed<String>, Long>() {
@@ -306,7 +306,7 @@ public class KGroupedStreamImplTest {
assertNull(table.queryableStoreName());
}
- private void doReduceSessionWindows(final Map<Windowed<String>, String> results) throws Exception {
+ private void doReduceSessionWindows(final Map<Windowed<String>, String> results) {
driver.setUp(builder, TestUtils.tempDirectory());
driver.setTime(10);
driver.process(TOPIC, "1", "A");
@@ -327,7 +327,7 @@ public class KGroupedStreamImplTest {
}
@Test
- public void shouldReduceSessionWindows() throws Exception {
+ public void shouldReduceSessionWindows() {
final Map<Windowed<String>, String> results = new HashMap<>();
KTable table = groupedStream.reduce(
new Reducer<String>() {
@@ -348,7 +348,7 @@ public class KGroupedStreamImplTest {
}
@Test
- public void shouldReduceSessionWindowsWithInternalStoreName() throws Exception {
+ public void shouldReduceSessionWindowsWithInternalStoreName() {
final Map<Windowed<String>, String> results = new HashMap<>();
KTable table = groupedStream.reduce(
new Reducer<String>() {
@@ -369,33 +369,33 @@ public class KGroupedStreamImplTest {
@Test(expected = NullPointerException.class)
- public void shouldNotAcceptNullReducerWhenReducingSessionWindows() throws Exception {
+ public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
groupedStream.reduce(null, SessionWindows.with(10), "store");
}
@Test(expected = NullPointerException.class)
- public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() throws Exception {
+ public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
groupedStream.reduce(MockReducer.STRING_ADDER, (SessionWindows) null, "store");
}
@Test
- public void shouldAcceptNullStoreNameWhenReducingSessionWindows() throws Exception {
+ public void shouldAcceptNullStoreNameWhenReducingSessionWindows() {
groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (String) null);
}
@Test(expected = InvalidTopicException.class)
- public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() throws Exception {
+ public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), INVALID_STORE_NAME);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
- public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() throws Exception {
+ public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (StateStoreSupplier<SessionStore>) null);
}
@Test(expected = NullPointerException.class)
- public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() throws Exception {
+ public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
groupedStream.aggregate(null, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
@Override
public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -405,7 +405,7 @@ public class KGroupedStreamImplTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() throws Exception {
+ public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
groupedStream.aggregate(MockInitializer.STRING_INIT, null, new Merger<String, String>() {
@Override
public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -415,7 +415,7 @@ public class KGroupedStreamImplTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() throws Exception {
+ public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
groupedStream.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER,
null,
@@ -425,7 +425,7 @@ public class KGroupedStreamImplTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() throws Exception {
+ public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
@Override
public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -435,7 +435,7 @@ public class KGroupedStreamImplTest {
}
@Test
- public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() throws Exception {
+ public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
@Override
public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -445,7 +445,7 @@ public class KGroupedStreamImplTest {
}
@Test(expected = InvalidTopicException.class)
- public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() throws Exception {
+ public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
@Override
public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -456,7 +456,7 @@ public class KGroupedStreamImplTest {
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
- public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() throws Exception {
+ public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() {
groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
@Override
public String apply(final String aggKey, final String aggOne, final String aggTwo) {
@@ -466,27 +466,27 @@ public class KGroupedStreamImplTest {
}
@Test(expected = NullPointerException.class)
- public void shouldNotAcceptNullSessionWindowsWhenCountingSessionWindows() throws Exception {
+ public void shouldNotAcceptNullSessionWindowsWhenCountingSessionWindows() {
groupedStream.count((SessionWindows) null, "store");
}
@Test
- public void shouldAcceptNullStoreNameWhenCountingSessionWindows() throws Exception {
+ public void shouldAcceptNullStoreNameWhenCountingSessionWindows() {
groupedStream.count(SessionWindows.with(90), (String) null);
}
@Test(expected = InvalidTopicException.class)
- public void shouldNotAcceptInvalidStoreNameWhenCountingSessionWindows() throws Exception {
+ public void shouldNotAcceptInvalidStoreNameWhenCountingSessionWindows() {
groupedStream.count(SessionWindows.with(90), INVALID_STORE_NAME);
}
@SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
- public void shouldNotAcceptNullStateStoreSupplierWhenCountingSessionWindows() throws Exception {
+ public void shouldNotAcceptNullStateStoreSupplierWhenCountingSessionWindows() {
groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null);
}
- private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) throws Exception {
+ private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
driver.setUp(builder, TestUtils.tempDirectory(), 0);
driver.setTime(0);
driver.process(TOPIC, "1", "A");
@@ -509,7 +509,7 @@ public class KGroupedStreamImplTest {
}
@Test
- public void shouldCountWindowed() throws Exception {
+ public void shouldCountWindowed() {
final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
groupedStream.count(
TimeWindows.of(500L),
@@ -527,7 +527,7 @@ public class KGroupedStreamImplTest {
@SuppressWarnings("deprecation")
@Test
- public void shouldCountWindowedWithInternalStoreName() throws Exception {
+ public void shouldCountWindowedWithInternalStoreName() {
final List<KeyValue<Windowed<String>, Long>> results = new ArrayList<>();
groupedStream.count(
TimeWindows.of(500L))