You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/12 13:36:20 UTC

[GitHub] [kafka] dongjinleekr opened a new pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

dongjinleekr opened a new pull request #9414:
URL: https://github.com/apache/kafka/pull/9414


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-834361327


   Hi @vvcephei, Could you review this PR now? :bow:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-714293384


   @vvcephei Here it is; I deleted a outdated condition statement in `EOSUncleanShutdownIntegrationTest`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-707271073


   Thanks for the PR, @dongjinleekr !
   
   Just a couple of meta-questions:
   1. Should we have some tests for this change?
   2. I think that right now, we'd be leaving behind some task directories and lock files. Maybe, we need to do a little extra legwork to recursively delete the app state directory. An integration test would probably surface this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-718889494


   Hey @dongjinleekr , just a quick check-in: do you plan to take a look at the test failures?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#discussion_r526377375



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -140,9 +140,6 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE
             IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(),
                 singletonList(new KeyValueTimestamp<>("k1", "v1", 0L)));
 
-            TestUtils.waitForCondition(stateDir::exists,
-                "Failed awaiting CreateTopics first request failure");

Review comment:
       Hmm, I see. Maybe I misread it. I thought we were asserting that Streams actually creates the directory when it starts processing, so that the later assertion (L159) that it cleans up the directory is actually valid.
   
   Then again, L159 seems to assert the opposite thing that its own comment states, so maybe this whole test is just wacky.
   
   The only clue about what we're trying to do here is the name, which almost doesn't make sense at all: `shouldWorkWithUncleanShutdownWipeOutStateStore`. I guess it's saying that we should delete the state store on unclean shutdown? I think that's not what we do anyway. I think we just guarantee that we write an empty checkpoint file. So maybe we should instead change the assertion to:
   * the directory is absent
   * OR the directory is present, but the checkpoint file is missing
   * OR the directory and checkpoint file are present, but the checkpoint file is empty
   
   Any of those should indicate a successful unclean shutdown.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-822991690


   > I know @vvcephei was quite swamped in the past months with pretty heavy release management duties.
   
   Totally agree. It's also why I did not press him. :smile: I rebased the PR into the latest trunk and checked it passes all tests!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-762079038


   Rebased onto the latest trunk. cc/ @vvcephei @mjsax


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-733525231


   Retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-731208645


   Hi @vvcephei,
   
   I re-based the branch onto the latest trunk, reorganized the changes into two commits, and fixed `EOSUncleanShutdownIntegrationTest#shouldWorkWithUncleanShutdownWipeOutStateStore` - Sure, its contents do not follow the description of its name. :wink: 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-858453218


   Rebased onto the latest trunk. cc/ @vvcephei


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-822036454


   @dongjinleekr @vvcephei just checking are you still working on it? I saw @vvcephei has approved the PR but it was not yet merged, hence asking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-724725724


   Hi @vvcephei,
   
   Here is the fix. The reason for the broken integration tests was: `KafkaStreams#cleanUp` can be called regardless of the `StreamThread`s are terminated.
   
   That is, if `KafkaStreams#cleanUp` is called when the `KafkaStreams` is still running, the `{state-store-directory}/{application-id}` directory is deleted and  `StreamThread` crashes with the exception from `StateDirectory#directoryForTask` - since it fails to create the task state store directory, i.e., `{state-store-directory}/{application-id}/{task-id}`.
   
   Moreover, `StateDirectory#directoryForTask` method has two additional vulnerabilities:
   
   1. When it creates the task state store directory, it does not create its parent directories automatically - if there is not `{state-store-directory}/{application-id}`, `taskDir.mkdir()` returns false and it throws an exception. For this behavior breaks the integration tests, I modified `taskDir.mkdir()` to `taskDir.mkdirs()` to create `{state-store-directory}/{application-id}` automatically. 
   2. This method only checks whether there is a `File` at `{state-store-directory}/{application-id}/{task-id}`, regardless of it is actually a directory or not. I added an additional check condition for this case and `StateDirectoryTest#shouldThrowProcessorStateException` is updated accordingly.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-847886290


   Hi @guozhangwang @vvcephei,
   Could you have a look now? :smiley: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-721135460


   Hi @vvcephei,
   
   Sorry for being late. Defending myself, I have been too busy to finalize my project last week, and it ended yesterday. Sure, I am now resolving the broken tests. There are 21 broken tests with the latest trunk, and I just resolved 16 of them. It will be completed soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-715502309


   Thanks for that fix, @dongjinleekr !
   
   Unfortunately, there are still 61 test failures: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9414/6/?cloudbees-analytics-link=scm-reporting%2Fstage%2Ffailure#showFailuresLink
   
   Do you mind taking another look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-721239457


   Perfect. Thanks, @dongjinleekr !
   
   No defense is necessary :) I was just wondering if it was still on your radar.
   
   Thanks again for the contribution!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#discussion_r524437121



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##########
@@ -186,19 +186,29 @@ public void shouldReportDirectoryEmpty() throws IOException {
 
     @Test
     public void shouldThrowProcessorStateException() throws IOException {

Review comment:
       Since you have modified the purpose of this test, maybe we can go ahead and give the test a more specific name as well. 
   
   ```suggestion
       public void shouldThrowProcessorStateExceptionIfTaskDirectoryIsOccupiedByFile() throws IOException {
   ```
   
   Also, I won't dispute the value of checking this condition, but would like to point out that this test was previously verifying a specific error on failure to create the task directory, and now we are no longer checking that failure. In other words, we were previously verifying "task directory [%s] doesn't exist and couldn't be created", but now we are only verifying the separate and specific failure reason "task directory path [%s] is already occupied".
   
   It actually seems like maybe we don't need to check that specific `!taskDir.isDirectory()` case, since it seems like having this file sitting there should cause a failure to create the task directory, right?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -140,9 +140,6 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE
             IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(),
                 singletonList(new KeyValueTimestamp<>("k1", "v1", 0L)));
 
-            TestUtils.waitForCondition(stateDir::exists,
-                "Failed awaiting CreateTopics first request failure");

Review comment:
       Can you explain why we need to remove this? It seems like the application must have created the state directory by this point, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9414:
URL: https://github.com/apache/kafka/pull/9414


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-858990902


   It looks like the core integration tests have gotten into bad shape. They've been failing on trunk as well. I just ran the Streams integration tests on my machine, and they passed, so I'll go ahead and merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr removed a comment on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr removed a comment on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-739132524


   Rebased onto the latest trunk.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-713251214


   @vvcephei Here it is; I updated the timeout to `IntegrationTestUtils.DEFAULT_TIMEOUT` and rebased onto the latest trunk also.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-822965530


   @dongjinleekr just FYI, I know @vvcephei was quite swamped in the past months with pretty heavy release management duties -- you'd see his email about 2.8.0 release, which was all thanks to him -- so I believe it's just he was too busy with 2.8.0 to review your PRs, and I'd like to thank you for your patience!
   
   @vvcephei whenever you have time please let know if the current PR is good or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr edited a comment on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr edited a comment on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-709135091


   @vvcephei
   
   > it seems like we could improve the test coverage by checking that we get the warning and do not delete the state dir if it's not empty.
   
   Totally agree. So, I added two dedicated tests for these cases. See the following test methods:
   
   - `StateDirectoryTest#shouldDeleteAppDirWhenCleanUpIfEmpty`
   - `StateDirectoryTest#shouldNotDeleteAppDirWhenCleanUpIfNotEmpty`
   
   I also added an integration test, `StateDirectoryIntegrationTest`. How about this? :smiley:


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-806739608


   Add to this, I also updated `EOSUncleanShutdownIntegrationTest` also. I inspected the history of this test suite precisely and found the following:
   
   1. This suite was introduced in c2ec974 by @abbccdda. It asserts that the Task's statestore directory (not root one in KafkaStreams's `state.dir` property) is deleted after an unclean shutdown. However, IMHO this assertion is a little bit inaccurate since the Task's statestore directory may not be deleted, for example, with an empty checkpoint file.
   2. A comment is added on L160 in d3c067f by @guozhangwang - 'the state directory should still exist with the empty checkpoint file'. However, IMHO this comment seems to be a mistake, since `assertFalse(stateDir.exists());` on L161 asserts that the Task's statestore directory does not exist.
   
   There are some other commits on this file later, but all of them are just minor improvements, not changing the testing logic.
   
   With this PR, Kafka Streams now deletes the empty application statestore directory (i.e., `{state.dir}/{application.id}`). So, I improved this suite like this:
   
   1. After initialization, wait until the Task's statestore directory is populated. (i.e., `{state.dir}/{application.id}/0_0/*`)
   2. Assert that one of the following is satisfied:
   
     - The Task's statestore directory is empty, so deleted.
     - The Task's statestore directory is not empty but without a checkpoint file.
     - The Task's statestore directory is not empty but with an empty checkpoint file.
   
   (Wait, then should we separate the modifications on `EOSUncleanShutdownIntegrationTest` into an independent PR?)
   
   Please have a look when you are free. Thanks again for reviewing my PR! :smiley:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-707498871


   Hi @vvcephei,
   
   1. Sure. I added some validations in `StateDirectoryTest`.
   
     - If `StateDirectory#clean` is called, the empty `appDir` is also deleted. (see `StateDirectoryTest#shouldLogManualUserCallMessage`.)
     - If `StateDirectory#clean` is not called, the global state directory and it parent, `appDir` is not deleted. (see `StateDirectoryTest#shouldLogStateDirCleanerMessage`.)
   
       Please note the difference in `StateDirectoryTest#shouldCleanupAllTaskDirectoriesIncludingGlobalOne`; the `appDir` was an empty directory before but it is now deleted.
   
   2. Since the goal of this modification is deleting the empty directory, we don't need a recursive delete; it is also why I called `File#delete` here, since it works and returns `true` iff the target directory is empty.
   
   And one more thing: I added an exception handling for `SecurityException`. :smile:


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#discussion_r504054028



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##########
@@ -602,6 +606,11 @@ public void shouldLogStateDirCleanerMessage() {
             directory.cleanRemovedTasks(cleanupDelayMs);
             assertThat(appender.getMessages(), hasItem(endsWith("ms has elapsed (cleanup delay is " +  cleanupDelayMs + "ms).")));
         }
+
+        // if appDir is empty, it is deleted in  process.
+        // since we did not call StateDirectory#clean, the global state directory is not deleted and appDir also.
+        assertTrue(appDir.exists());
+        assertArrayEquals(appDir.list(), new String[]{"0_0"});

Review comment:
       We shouldn't add unrelated checks to tests in general, it just makes the tests more confusing. If you want to verify that the directory still exists after closing when we don't call `clean()`, we should just add a new test for it.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##########
@@ -510,8 +511,8 @@ public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() {
 
         directory.clean();
 
-        assertEquals(Collections.emptySet(), Arrays.stream(
-            Objects.requireNonNull(appDir.listFiles())).collect(Collectors.toSet()));
+        // if appDir is empty, it is deleted in StateDirectory#clean process.
+        assertTrue(!appDir.exists());

Review comment:
       ```suggestion
           assertFalse(appDir.exists());
   ```
   
   :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -301,6 +301,22 @@ public synchronized void clean() {
             );
             throw new StreamsException(exception);
         }
+
+        try {
+            if (hasPersistentStores && stateDir.exists() && !stateDir.delete()) {

Review comment:
       ```suggestion
               if (stateDir.exists() && !stateDir.delete()) {
   ```
   
   Do we need to check `hasPersistentStores` here? It seems sufficient just to check if the directory exists.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##########
@@ -586,6 +587,9 @@ public void shouldLogManualUserCallMessage() {
                 hasItem(endsWith("as user calling cleanup."))
             );
         }
+
+        // if appDir is empty, it is deleted in StateDirectory#clean process.
+        assertTrue(!appDir.exists());

Review comment:
       Similar feedback here as below. This test is about logging, not cleanup. You've already added a check to the "shouldCleanup" test, so we don't need one here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-713658414


   Hmm, actually I see those same 20 tests fail for me locally. Can you take a look, @dongjinleekr ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-709135091


   > it seems like we could improve the test coverage by checking that we get the warning and do not delete the state dir if it's not empty.
   
   Totally agree. So, I added two dedicated tests for these cases. See the following test methods:
   
   - `StateDirectoryTest#shouldDeleteAppDirWhenCleanUpIfEmpty`
   - `StateDirectoryTest#shouldNotDeleteAppDirWhenCleanUpIfNotEmpty`
   
   I also added an integration test, `StateDirectoryIntegrationTest`. How about this? :smiley:


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#discussion_r508798891



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category({IntegrationTest.class})
+public class StateDirectoryIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @Test
+    public void testCleanUpStateDirIfEmpty() throws InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+
+        // Create Topic
+        final String input = uniqueTestName + "-input";
+        CLUSTER.createTopic(input);
+
+        final Properties producerConfig = mkProperties(mkMap(
+            mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+            mkEntry(ProducerConfig.ACKS_CONFIG, "all"),
+            mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName()),
+            mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName())
+        ));
+
+        try (final KafkaProducer<String, String> producer =
+                 new KafkaProducer<>(producerConfig, Serdes.String().serializer(), Serdes.String().serializer())) {
+            // Create Test Records
+            producer.send(new ProducerRecord<>(input, "a"));
+            producer.send(new ProducerRecord<>(input, "b"));
+            producer.send(new ProducerRecord<>(input, "c"));
+
+            // Create Topology
+            final String storeName = uniqueTestName + "-input-table";
+
+            final StreamsBuilder builder = new StreamsBuilder();
+            builder.table(
+                input,
+                Materialized
+                    .<String, String, KeyValueStore<Bytes, byte[]>>as(storeName)
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.String())
+            );
+            final Topology topology = builder.build();
+
+            // State Store Directory
+            final String stateDir = TestUtils.tempDirectory(uniqueTestName).getPath();
+
+            // Create KafkaStreams instance
+            final String applicationId = uniqueTestName + "-app";
+            final Properties streamsConfig = mkProperties(mkMap(
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, applicationId),
+                mkEntry(StreamsConfig.STATE_DIR_CONFIG, stateDir),
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
+            ));
+
+            final KafkaStreams streams = new KafkaStreams(topology, streamsConfig);
+
+            // Create StateListener
+            final CountDownLatch runningLatch = new CountDownLatch(1);
+            final CountDownLatch notRunningLatch = new CountDownLatch(1);
+
+            final KafkaStreams.StateListener stateListener = (newState, oldState) -> {
+                if (newState == KafkaStreams.State.RUNNING) {
+                    runningLatch.countDown();
+                }
+                if (newState == KafkaStreams.State.NOT_RUNNING) {
+                    notRunningLatch.countDown();
+                }
+            };
+            streams.setStateListener(stateListener);
+
+            // Application state directory
+            final File appDir = new File(stateDir, applicationId);
+
+            // Validate application state directory is created.
+            streams.start();
+            try {
+                runningLatch.await(10 * 1000L, TimeUnit.MILLISECONDS);

Review comment:
       I'd be tempted to give it more time. Experience says that Jenkins will take unreasonably long to perform these operations. How about using the default timeout in IntegrationTestUtils here and below?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-822452651


   Hi @guozhangwang,
   
   Thanks for your mention. I completed applying @vvcephei's comment (on March 25, 2021) and awaiting review and merging, with maintaining the PR to the latest trunk.
   
   As you can see here, this issue was started much earlier but It seems like I misunderstood @vvcephei's intention. 😢 And I recognized it this March.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on a change in pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#discussion_r505424665



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -301,6 +301,22 @@ public synchronized void clean() {
             );
             throw new StreamsException(exception);
         }
+
+        try {
+            if (hasPersistentStores && stateDir.exists() && !stateDir.delete()) {

Review comment:
       Exactly. But I thought keeping symmetry with the Consturctor is better.
   
   ```
   if (this.hasPersistentStores && !stateDir.exists() && !stateDir.mkdir()) {
       throw new ProcessorStateException(
           String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-739132524


   Rebased onto the latest trunk.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on a change in pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#discussion_r525845117



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##########
@@ -186,19 +186,29 @@ public void shouldReportDirectoryEmpty() throws IOException {
 
     @Test
     public void shouldThrowProcessorStateException() throws IOException {

Review comment:
       Totally agree. :smile: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-806736975


   Hi @vvcephei,
   
   It seems like I have failed to understand your intention correctly. Here is the fix. I reorganized the PR into two commits, one for `StateDirectory` and the other one for `EOSUncleanShutdownIntegrationTest`. The following summarizes the updates:
   
   1. Rebase the PR onto the latest trunk, resolving all broken syntaxes.
   2. Remove redundant assertion in `StateDirectoryTest#shouldNotDeleteAppDirWhenCleanUpIfNotEmpty`; it now validates logging message only.
   3. Make `StateDirectoryIntegrationTest` to use default timeout (`IntegrationTestUtils.DEFAULT_TIMEOUT`) with its latches, keeping the suite from being flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-713664978


   Just for reference, I don't see those failing tests on trunk (67bc4f08feb50ac135a4d8e1d469747102aad3a6).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dongjinleekr commented on a change in pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

Posted by GitBox <gi...@apache.org>.
dongjinleekr commented on a change in pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#discussion_r525835354



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
##########
@@ -140,9 +140,6 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE
             IntegrationTestUtils.produceSynchronously(producerConfig, false, input, Optional.empty(),
                 singletonList(new KeyValueTimestamp<>("k1", "v1", 0L)));
 
-            TestUtils.waitForCondition(stateDir::exists,
-                "Failed awaiting CreateTopics first request failure");

Review comment:
       - Previous: The test asserts that the (empty) StateStore directory is not deleted.
   - Now: The empty StateStore directory is deleted in the cleanup process, so this assertion is no longer valid. (wait, would it much better to negate the logical condition instead of removing it?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org