You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/16 23:56:26 UTC
[kafka] branch 2.0 updated: MINOR: fixed missing close of Iterator,
used try-with-resource where appropriate (#6562)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new daaddb5 MINOR: fixed missing close of Iterator, used try-with-resource where appropriate (#6562)
daaddb5 is described below
commit daaddb5f2a315c0e54807016eaaa5e3d9386b69b
Author: pkleindl <44...@users.noreply.github.com>
AuthorDate: Wed Apr 17 01:44:17 2019 +0200
MINOR: fixed missing close of Iterator, used try-with-resource where appropriate (#6562)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../apache/kafka/streams/TopologyTestDriverTest.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 8400e87..b32226c 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -876,10 +876,11 @@ public class TopologyTestDriverTest {
}
private void flushStore() {
- final KeyValueIterator<String, Long> it = store.all();
- while (it.hasNext()) {
- final KeyValue<String, Long> next = it.next();
- context.forward(next.key, next.value);
+ try (final KeyValueIterator<String, Long> it = store.all()) {
+ while (it.hasNext()) {
+ final KeyValue<String, Long> next = it.next();
+ context.forward(next.key, next.value);
+ }
}
}
@@ -943,21 +944,20 @@ public class TopologyTestDriverTest {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
- {
- final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
+ try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
Assert.assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
testDriver.pipeInput(recordFactory.create("input-topic", "a", 1L));
Assert.assertEquals(1L, testDriver.getKeyValueStore("storeProcessorStore").get("a"));
- testDriver.close();
}
- {
- final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
+
+ try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, config)) {
Assert.assertNull(
"Closing the prior test driver should have cleaned up this store and value.",
testDriver.getKeyValueStore("storeProcessorStore").get("a")
);
}
+
}
@Test