You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/10/07 22:51:12 UTC
[kafka] branch 2.6 updated: KAFKA-10564: only process non-empty
task directories when internally cleaning obsolete state stores (#9373)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new e4540fe KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (#9373)
e4540fe is described below
commit e4540fec7cf310b47ddcb81ec27d2e5c2046c7ff
Author: Michael Bingham <mi...@gmail.com>
AuthorDate: Wed Oct 7 16:48:35 2020 -0600
KAFKA-10564: only process non-empty task directories when internally cleaning obsolete state stores (#9373)
Avoid continuous repeated logging by not trying to clean empty task directories, which are longer fully deleted during internal cleanup as of https://issues.apache.org/jira/browse/KAFKA-6647.
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../processor/internals/StateDirectory.java | 2 +-
.../processor/internals/StateDirectoryTest.java | 33 ++++++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index de9fbea..35f937a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -315,7 +315,7 @@ public class StateDirectory {
}
private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
- for (final File taskDir : listAllTaskDirectories()) {
+ for (final File taskDir : listNonEmptyTaskDirectories()) {
final String dirName = taskDir.getName();
final TaskId id = TaskId.parse(dirName);
if (!locks.containsKey(id)) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index cb98789..24c9ab0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -50,9 +50,11 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -286,6 +288,7 @@ public class StateDirectoryTest {
}
}
+
@Test
public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() {
final File dir = directory.directoryForTask(new TaskId(2, 0));
@@ -305,6 +308,36 @@ public class StateDirectoryTest {
}
@Test
+ public void shouldCleanupObsoleteStateDirectoriesOnlyOnce() {
+ final File dir = directory.directoryForTask(new TaskId(2, 0));
+ assertTrue(new File(dir, "store").mkdir());
+ assertEquals(1, directory.listAllTaskDirectories().length);
+ assertEquals(1, directory.listNonEmptyTaskDirectories().length);
+
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+ directory.cleanRemovedTasks(0);
+ assertTrue(dir.exists());
+ assertEquals(1, directory.listAllTaskDirectories().length);
+ assertEquals(0, directory.listNonEmptyTaskDirectories().length);
+ assertThat(
+ appender.getMessages(),
+ hasItem(containsString("Deleting obsolete state directory"))
+ );
+ }
+
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+ directory.cleanRemovedTasks(0);
+ assertTrue(dir.exists());
+ assertEquals(1, directory.listAllTaskDirectories().length);
+ assertEquals(0, directory.listNonEmptyTaskDirectories().length);
+ assertThat(
+ appender.getMessages(),
+ not(hasItem(containsString("Deleting obsolete state directory")))
+ );
+ }
+ }
+
+ @Test
public void shouldNotRemoveNonTaskDirectoriesAndFiles() {
final File otherDir = TestUtils.tempDirectory(stateDir.toPath(), "foo");
directory.cleanRemovedTasks(0);