You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/10/07 22:51:12 UTC

[kafka] branch 2.6 updated: KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (#9373)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new e4540fe  KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (#9373)
e4540fe is described below

commit e4540fec7cf310b47ddcb81ec27d2e5c2046c7ff
Author: Michael Bingham <mi...@gmail.com>
AuthorDate: Wed Oct 7 16:48:35 2020 -0600

    KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (#9373)
    
    Avoid continuous repeated logging by not trying to clean empty task directories, which are longer fully deleted during internal cleanup as of https://issues.apache.org/jira/browse/KAFKA-6647.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../processor/internals/StateDirectory.java        |  2 +-
 .../processor/internals/StateDirectoryTest.java    | 33 ++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index de9fbea..35f937a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -315,7 +315,7 @@ public class StateDirectory {
     }
 
     private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
-        for (final File taskDir : listAllTaskDirectories()) {
+        for (final File taskDir : listNonEmptyTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index cb98789..24c9ab0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -50,9 +50,11 @@ import java.util.stream.Collectors;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_FILE_NAME;
 import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.endsWith;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -286,6 +288,7 @@ public class StateDirectoryTest {
         }
     }
 
+
     @Test
     public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() {
         final File dir = directory.directoryForTask(new TaskId(2, 0));
@@ -305,6 +308,36 @@ public class StateDirectoryTest {
     }
 
     @Test
+    public void shouldCleanupObsoleteStateDirectoriesOnlyOnce() {
+        final File dir = directory.directoryForTask(new TaskId(2, 0));
+        assertTrue(new File(dir, "store").mkdir());
+        assertEquals(1, directory.listAllTaskDirectories().length);
+        assertEquals(1, directory.listNonEmptyTaskDirectories().length);
+
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+            directory.cleanRemovedTasks(0);
+            assertTrue(dir.exists());
+            assertEquals(1, directory.listAllTaskDirectories().length);
+            assertEquals(0, directory.listNonEmptyTaskDirectories().length);
+            assertThat(
+                appender.getMessages(),
+                hasItem(containsString("Deleting obsolete state directory"))
+            );
+        }
+
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+            directory.cleanRemovedTasks(0);
+            assertTrue(dir.exists());
+            assertEquals(1, directory.listAllTaskDirectories().length);
+            assertEquals(0, directory.listNonEmptyTaskDirectories().length);
+            assertThat(
+                appender.getMessages(),
+                not(hasItem(containsString("Deleting obsolete state directory")))
+            );
+        }
+    }
+
+    @Test
     public void shouldNotRemoveNonTaskDirectoriesAndFiles() {
         final File otherDir = TestUtils.tempDirectory(stateDir.toPath(), "foo");
         directory.cleanRemovedTasks(0);