You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/11 07:25:19 UTC

[GitHub] [kafka] vitojeng opened a new pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

vitojeng opened a new pull request #10668:
URL: https://github.com/apache/kafka/pull/10668


   In the PR #9821, @mjsax 's [comment](https://github.com/apache/kafka/pull/9821#discussion_r556200365)
   
   > We should use a try-with-resources clause to make sure close() is called.
   >
   > Seems, other tests in this class have a similar issue. Would be good to fix all test accordingly (if you want you can also to a separate PR for it).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630603389



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
                 () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
                 "Streams never stopped"
             );
-        } finally {
             streams.close();

Review comment:
       Thanks @mjsax for review. Will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#issuecomment-853547323


   Thanks @vitojeng! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10668:
URL: https://github.com/apache/kafka/pull/10668


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#issuecomment-853249482


   @vitojeng -- Seems there is some conflicts. Can you rebase your PR so we can merge it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630602092



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -553,141 +552,152 @@ public void testInitializesAndDestroysMetricsReporters() {
 
     @Test
     public void testCloseIsIdempotent() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.close();
-        final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.close();
+            final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-        streams.close();
-        Assert.assertEquals("subsequent close() calls should do nothing",
-            closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+            streams.close();
+            Assert.assertEquals("subsequent close() calls should do nothing",
+                closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+        }
     }
 
     @Test
     public void shouldAddThreadWhenRunning() throws InterruptedException {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        final int oldSize = streams.threads.size();
-        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
-        assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
-        assertThat(streams.threads.size(), equalTo(oldSize + 1));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            final int oldSize = streams.threads.size();
+            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+            assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
+            assertThat(streams.threads.size(), equalTo(oldSize + 1));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenCreated() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenClosed() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        streams.close();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            streams.close();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenError() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        streams.start();
-        globalStreamThread.shutdown();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            streams.start();
+            globalStreamThread.shutdown();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotReturnDeadThreads() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        streamThreadOne.shutdown();
-        final Set<ThreadMetadata> threads = streams.localThreadsMetadata();
+        final Set<ThreadMetadata> threads;
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            streamThreadOne.shutdown();
+            threads = streams.localThreadsMetadata();
+        }
         assertThat(threads.size(), equalTo(1));
         assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
     }
 
     @Test
     public void shouldRemoveThread() throws InterruptedException {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        final int oldSize = streams.threads.size();
-        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
-            "Kafka Streams client did not reach state RUNNING");
-        assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1)));
-        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            final int oldSize = streams.threads.size();
+            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
+                "Kafka Streams client did not reach state RUNNING");
+            assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1)));
+            assertThat(streams.threads.size(), equalTo(oldSize - 1));
+        }
     }
 
     @Test
     public void shouldNotRemoveThreadWhenNotRunning() {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(1));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(1));
+        }
     }
 
     @Test
     public void testCannotStartOnceClosed() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        streams.close();
-        try {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
             streams.start();
-            fail("Should have throw IllegalStateException");
-        } catch (final IllegalStateException expected) {
-            // this is ok
-        } finally {
             streams.close();
+            try {
+                streams.start();
+                fail("Should have throw IllegalStateException");

Review comment:
       Thinks @chia7712 for review. Nice catch. 
   Should we create another PR to address this case ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630603482



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception {
                 () -> streams.state() == KafkaStreams.State.PENDING_ERROR,
                 "Thread never stopped."
             );
-        } finally {
             streams.close();

Review comment:
       Ok, will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vitojeng commented on pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
vitojeng commented on pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#issuecomment-838538304


   @mjsax , @ableegoldman 
   Please take a look. :)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vitojeng commented on pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
vitojeng commented on pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#issuecomment-853445357


   Thanks @mjsax !
   Please take a look.🙂


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630601302



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
                 () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
                 "Streams never stopped"
             );
-        } finally {
             streams.close();

Review comment:
       We can remove this line now. `close()` will be cause automatically using try-with-resource clause.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r640127195



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception {
                 () -> streams.state() == KafkaStreams.State.PENDING_ERROR,
                 "Thread never stopped."
             );
-        } finally {
             streams.close();

Review comment:
       Sams comment as above. The Java compiler will insert a `close()` call and there is no need to explicitly call it any longer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630601702



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception {
                 () -> streams.state() == KafkaStreams.State.PENDING_ERROR,
                 "Thread never stopped."
             );
-        } finally {
             streams.close();

Review comment:
       As above. (Maybe similar elsewhere; won't comment on it explicitly below)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10668:
URL: https://github.com/apache/kafka/pull/10668


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630602355



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -553,141 +552,152 @@ public void testInitializesAndDestroysMetricsReporters() {
 
     @Test
     public void testCloseIsIdempotent() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.close();
-        final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.close();
+            final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-        streams.close();
-        Assert.assertEquals("subsequent close() calls should do nothing",
-            closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+            streams.close();
+            Assert.assertEquals("subsequent close() calls should do nothing",
+                closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+        }
     }
 
     @Test
     public void shouldAddThreadWhenRunning() throws InterruptedException {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        final int oldSize = streams.threads.size();
-        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
-        assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
-        assertThat(streams.threads.size(), equalTo(oldSize + 1));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            final int oldSize = streams.threads.size();
+            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+            assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
+            assertThat(streams.threads.size(), equalTo(oldSize + 1));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenCreated() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenClosed() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        streams.close();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            streams.close();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenError() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        streams.start();
-        globalStreamThread.shutdown();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            streams.start();
+            globalStreamThread.shutdown();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotReturnDeadThreads() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        streamThreadOne.shutdown();
-        final Set<ThreadMetadata> threads = streams.localThreadsMetadata();
+        final Set<ThreadMetadata> threads;
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            streamThreadOne.shutdown();
+            threads = streams.localThreadsMetadata();
+        }
         assertThat(threads.size(), equalTo(1));
         assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
     }
 
     @Test
     public void shouldRemoveThread() throws InterruptedException {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        final int oldSize = streams.threads.size();
-        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
-            "Kafka Streams client did not reach state RUNNING");
-        assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1)));
-        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            final int oldSize = streams.threads.size();
+            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
+                "Kafka Streams client did not reach state RUNNING");
+            assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1)));
+            assertThat(streams.threads.size(), equalTo(oldSize - 1));
+        }
     }
 
     @Test
     public void shouldNotRemoveThreadWhenNotRunning() {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(1));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(1));
+        }
     }
 
     @Test
     public void testCannotStartOnceClosed() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        streams.close();
-        try {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
             streams.start();
-            fail("Should have throw IllegalStateException");
-        } catch (final IllegalStateException expected) {
-            // this is ok
-        } finally {
             streams.close();
+            try {
+                streams.start();
+                fail("Should have throw IllegalStateException");

Review comment:
       +1 -- also below in other tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630701286



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
                 () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
                 "Streams never stopped"
             );
-        } finally {
             streams.close();

Review comment:
       @mjsax We should not remove this line, otherwise the line 498(original) will throw a timeout exception.
   The streams state will be always **RUNNING** if we skip `close()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r640217904



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
                 () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
                 "Streams never stopped"
             );
-        } finally {
             streams.close();

Review comment:
       @mjsax Thanks for the detail description. I knew the things you describe.
   But in this case, after `streams.close()` , we still need to check streams state whether is `NOT_RUNNING`.
   If we remove `streams.close()`, the streams state will still be `RUNNING`, this state will lead to failed of the next checking of `NOT_RUNNING`.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r644184061



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
                 () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
                 "Streams never stopped"
             );
-        } finally {
             streams.close();

Review comment:
       I missed the fact that we moved the `waitForCondition` check _inside_ of the try-catch block... For this case, we need to call `close` explicitly of course, as we are still in the block and `close()` is not auto-called yet...
   
   Sorry for the confusion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r640126531



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -491,25 +493,23 @@ public void testStateThreadClose() throws Exception {
                 () -> streams.localThreadsMetadata().stream().allMatch(t -> t.threadState().equals("DEAD")),
                 "Streams never stopped"
             );
-        } finally {
             streams.close();

Review comment:
       As `KafkaStreams` implements the `AutoCloseable` interface now, `close()` should be called automatically when the `try {}` block is left -- that is the whole purpose of `AutoClosable` and try-with-resource construct -- it frees you up to call `close()` explicitly (so you cannot forget any longer).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630238771



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -553,141 +552,152 @@ public void testInitializesAndDestroysMetricsReporters() {
 
     @Test
     public void testCloseIsIdempotent() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.close();
-        final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.close();
+            final int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
 
-        streams.close();
-        Assert.assertEquals("subsequent close() calls should do nothing",
-            closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+            streams.close();
+            Assert.assertEquals("subsequent close() calls should do nothing",
+                closeCount, MockMetricsReporter.CLOSE_COUNT.get());
+        }
     }
 
     @Test
     public void shouldAddThreadWhenRunning() throws InterruptedException {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        final int oldSize = streams.threads.size();
-        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
-        assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
-        assertThat(streams.threads.size(), equalTo(oldSize + 1));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            final int oldSize = streams.threads.size();
+            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running");
+            assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2)));
+            assertThat(streams.threads.size(), equalTo(oldSize + 1));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenCreated() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenClosed() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        streams.close();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            streams.close();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotAddThreadWhenError() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        final int oldSize = streams.threads.size();
-        streams.start();
-        globalStreamThread.shutdown();
-        assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(oldSize));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            final int oldSize = streams.threads.size();
+            streams.start();
+            globalStreamThread.shutdown();
+            assertThat(streams.addStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(oldSize));
+        }
     }
 
     @Test
     public void shouldNotReturnDeadThreads() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        streamThreadOne.shutdown();
-        final Set<ThreadMetadata> threads = streams.localThreadsMetadata();
+        final Set<ThreadMetadata> threads;
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            streamThreadOne.shutdown();
+            threads = streams.localThreadsMetadata();
+        }
         assertThat(threads.size(), equalTo(1));
         assertThat(threads, hasItem(streamThreadTwo.threadMetadata()));
     }
 
     @Test
     public void shouldRemoveThread() throws InterruptedException {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        final int oldSize = streams.threads.size();
-        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
-            "Kafka Streams client did not reach state RUNNING");
-        assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1)));
-        assertThat(streams.threads.size(), equalTo(oldSize - 1));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            streams.start();
+            final int oldSize = streams.threads.size();
+            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L,
+                "Kafka Streams client did not reach state RUNNING");
+            assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1)));
+            assertThat(streams.threads.size(), equalTo(oldSize - 1));
+        }
     }
 
     @Test
     public void shouldNotRemoveThreadWhenNotRunning() {
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
-        assertThat(streams.threads.size(), equalTo(1));
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+            assertThat(streams.removeStreamThread(), equalTo(Optional.empty()));
+            assertThat(streams.threads.size(), equalTo(1));
+        }
     }
 
     @Test
     public void testCannotStartOnceClosed() {
-        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
-        streams.start();
-        streams.close();
-        try {
+        try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
             streams.start();
-            fail("Should have throw IllegalStateException");
-        } catch (final IllegalStateException expected) {
-            // this is ok
-        } finally {
             streams.close();
+            try {
+                streams.start();
+                fail("Should have throw IllegalStateException");

Review comment:
       How about replacing it by ```assertThrows```?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#issuecomment-853547323


   Thanks @vitojeng! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vitojeng commented on a change in pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
vitojeng commented on a change in pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#discussion_r630705257



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -525,14 +525,13 @@ public void testStateGlobalThreadClose() throws Exception {
                 () -> streams.state() == KafkaStreams.State.PENDING_ERROR,
                 "Thread never stopped."
             );
-        } finally {
             streams.close();

Review comment:
       @mjsax The line 532(original) wait for the streams state equal to **ERROR** after streams closed. 
   Although this test case will pass if we remove `close()`, but it seems we might be better to retain `close()`?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vitojeng commented on pull request #10668: MINOR: Apply try-with-resource to KafkaStreamsTest

Posted by GitBox <gi...@apache.org>.
vitojeng commented on pull request #10668:
URL: https://github.com/apache/kafka/pull/10668#issuecomment-853445357


   Thanks @mjsax !
   Please take a look.🙂


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org