You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/14 21:09:27 UTC

[kafka] branch 2.5 updated: KAFKA-6647: Do note delete the lock file while holding the lock (#8267)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new f2963c2  KAFKA-6647: Do note delete the lock file while holding the lock (#8267)
f2963c2 is described below

commit f2963c212eb96d00d2910ae72ac96c0416143922
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Sat Mar 14 13:49:08 2020 -0700

    KAFKA-6647: Do note delete the lock file while holding the lock (#8267)
    
    1. Inside StateDirectory#cleanRemovedTasks, skip deleting the lock file (and hence the parent directory) until releasing the lock. And after the lock is released only go ahead and delete the parent directory if manualUserCall == true. That is, this is triggered from KafkaStreams#cleanUp and users are responsible to make sure that Streams instance is not started and hence there are no other threads trying to grab that lock.
    
    2. As a result, during scheduled cleanup the corresponding task.dir would not be empty but be left with only the lock file, so effectively we still achieve the goal of releasing disk spaces. For callers of listTaskDirectories like KIP-441 (cc @ableegoldman to take a look) I've introduced a new listNonEmptyTaskDirectories which excludes such dummy task.dirs with only the lock file left.
    
    3. Also fixed KAFKA-8999 along the way to expose the exception while traversing the directory.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, John Roesler <vv...@apache.org>
---
 .../java/org/apache/kafka/common/utils/Utils.java  |  41 ++++--
 .../org/apache/kafka/streams/KafkaStreams.java     |   4 +-
 .../processor/internals/StateDirectory.java        | 137 +++++++++++++--------
 .../streams/processor/internals/TaskManager.java   |   2 +-
 .../processor/internals/StateDirectoryTest.java    |  44 +++++--
 .../processor/internals/TaskManagerTest.java       |   3 +-
 6 files changed, 157 insertions(+), 74 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 0d16c0a..e9d4cc4 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -736,29 +736,56 @@ public final class Utils {
     /**
      * Recursively delete the given file/directory and any subfiles (if any exist)
      *
-     * @param file The root file at which to begin deleting
+     * @param rootFile The root file at which to begin deleting
      */
-    public static void delete(final File file) throws IOException {
-        if (file == null)
+    public static void delete(final File rootFile) throws IOException {
+        delete(rootFile, Collections.emptyList());
+    }
+
+    /**
+     * Recursively delete the subfiles (if any exist) of the passed in root file that are not included
+     * in the list to keep
+     *
+     * @param rootFile The root file at which to begin deleting
+     * @param filesToKeep The subfiles to keep (note that if a subfile is to be kept, so are all its parent
+     *                    files in its pat)h; if empty we would also delete the root file
+     */
+    public static void delete(final File rootFile, final List<File> filesToKeep) throws IOException {
+        if (rootFile == null)
             return;
-        Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
+        Files.walkFileTree(rootFile.toPath(), new SimpleFileVisitor<Path>() {
             @Override
             public FileVisitResult visitFileFailed(Path path, IOException exc) throws IOException {
                 // If the root path did not exist, ignore the error; otherwise throw it.
-                if (exc instanceof NoSuchFileException && path.toFile().equals(file))
+                if (exc instanceof NoSuchFileException && path.toFile().equals(rootFile))
                     return FileVisitResult.TERMINATE;
                 throw exc;
             }
 
             @Override
             public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) throws IOException {
-                Files.delete(path);
+                if (!filesToKeep.contains(path.toFile())) {
+                    Files.delete(path);
+                }
                 return FileVisitResult.CONTINUE;
             }
 
             @Override
             public FileVisitResult postVisitDirectory(Path path, IOException exc) throws IOException {
-                Files.delete(path);
+                // KAFKA-8999: if there's an exception thrown previously already, we should throw it
+                if (exc != null) {
+                    throw exc;
+                }
+
+                if (rootFile.toPath().equals(path)) {
+                    // only delete the parent directory if there's nothing to keep
+                    if (filesToKeep.isEmpty()) {
+                        Files.delete(path);
+                    }
+                } else if (!filesToKeep.contains(path.toFile())) {
+                    Files.delete(path);
+                }
+
                 return FileVisitResult.CONTINUE;
             }
         });
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2563718..67d185d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -716,11 +716,11 @@ public class KafkaStreams implements AutoCloseable {
         }
         final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
         final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
-        final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
+        final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() ||
                 (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
 
         try {
-            stateDirectory = new StateDirectory(config, time, createStateDirectory);
+            stateDirectory = new StateDirectory(config, time, hasPersistentStores);
         } catch (final ProcessorStateException fatal) {
             throw new StreamsException(fatal);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index f5c4c31..68a9a79 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -33,9 +33,12 @@ import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.regex.Pattern;
 
+import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+
 /**
  * Manages the directories where the state of Tasks owned by a {@link StreamThread} are
  * stored. Handles creation/locking/unlocking/cleaning of the Task Directories. This class is not
@@ -48,11 +51,12 @@ public class StateDirectory {
     static final String LOCK_FILE_NAME = ".lock";
     private static final Logger log = LoggerFactory.getLogger(StateDirectory.class);
 
+    private final Time time;
+    private final String appId;
     private final File stateDir;
-    private final boolean createStateDirectory;
+    private final boolean hasPersistentStores;
     private final HashMap<TaskId, FileChannel> channels = new HashMap<>();
     private final HashMap<TaskId, LockAndOwner> locks = new HashMap<>();
-    private final Time time;
 
     private FileChannel globalStateChannel;
     private FileLock globalStateLock;
@@ -70,22 +74,27 @@ public class StateDirectory {
     /**
      * Ensures that the state base directory as well as the application's sub-directory are created.
      *
+     * @param config              streams application configuration to read the root state directory path
+     * @param time                system timer used to execute periodic cleanup procedure
+     * @param hasPersistentStores only when the application's topology does have stores persisted on local file
+     *                            system, we would go ahead and auto-create the corresponding application / task / store
+     *                            directories whenever necessary; otherwise no directories would be created.
+     *
      * @throws ProcessorStateException if the base state directory or application state directory does not exist
-     *                                 and could not be created when createStateDirectory is enabled.
+     *                                 and could not be created when hasPersistentStores is enabled.
      */
-    public StateDirectory(final StreamsConfig config,
-                          final Time time,
-                          final boolean createStateDirectory) {
+    public StateDirectory(final StreamsConfig config, final Time time, final boolean hasPersistentStores) {
         this.time = time;
-        this.createStateDirectory = createStateDirectory;
+        this.hasPersistentStores = hasPersistentStores;
+        this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
         final File baseDir = new File(stateDirName);
-        if (this.createStateDirectory && !baseDir.exists() && !baseDir.mkdirs()) {
+        if (this.hasPersistentStores && !baseDir.exists() && !baseDir.mkdirs()) {
             throw new ProcessorStateException(
                 String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
         }
-        stateDir = new File(baseDir, config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
-        if (this.createStateDirectory && !stateDir.exists() && !stateDir.mkdir()) {
+        stateDir = new File(baseDir, appId);
+        if (this.hasPersistentStores && !stateDir.exists() && !stateDir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
         }
@@ -98,13 +107,22 @@ public class StateDirectory {
      */
     public File directoryForTask(final TaskId taskId) {
         final File taskDir = new File(stateDir, taskId.toString());
-        if (createStateDirectory && !taskDir.exists() && !taskDir.mkdir()) {
+        if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath()));
         }
         return taskDir;
     }
 
+    private boolean taskDirEmpty(final File taskDir) {
+        final File[] storeDirs = taskDir.listFiles(pathname ->
+            !pathname.getName().equals(LOCK_FILE_NAME) &&
+                !pathname.getName().equals(CHECKPOINT_FILE_NAME));
+
+        // if the task is stateless, storeDirs would be null
+        return storeDirs == null || storeDirs.length == 0;
+    }
+
     /**
      * Get or create the directory for the global stores.
      * @return directory for the global stores
@@ -112,7 +130,7 @@ public class StateDirectory {
      */
     File globalStateDir() {
         final File dir = new File(stateDir, "global");
-        if (createStateDirectory && !dir.exists() && !dir.mkdir()) {
+        if (hasPersistentStores && !dir.exists() && !dir.mkdir()) {
             throw new ProcessorStateException(
                 String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath()));
         }
@@ -125,12 +143,12 @@ public class StateDirectory {
 
     /**
      * Get the lock for the {@link TaskId}s directory if it is available
-     * @param taskId
+     * @param taskId task id
      * @return true if successful
-     * @throws IOException
+     * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal
      */
     synchronized boolean lock(final TaskId taskId) throws IOException {
-        if (!createStateDirectory) {
+        if (!hasPersistentStores) {
             return true;
         }
 
@@ -174,7 +192,7 @@ public class StateDirectory {
     }
 
     synchronized boolean lockGlobalState() throws IOException {
-        if (!createStateDirectory) {
+        if (!hasPersistentStores) {
             return true;
         }
 
@@ -236,18 +254,21 @@ public class StateDirectory {
     }
 
     public synchronized void clean() {
+        // remove task dirs
         try {
             cleanRemovedTasks(0, true);
         } catch (final Exception e) {
             // this is already logged within cleanRemovedTasks
             throw new StreamsException(e);
         }
+        // remove global dir
         try {
             if (stateDir.exists()) {
                 Utils.delete(globalStateDir().getAbsoluteFile());
             }
         } catch (final IOException e) {
-            log.error("{} Failed to delete global state directory due to an unexpected exception", logPrefix(), e);
+            log.error("{} Failed to delete global state directory of {} due to an unexpected exception",
+                appId, logPrefix(), e);
             throw new StreamsException(e);
         }
     }
@@ -269,7 +290,7 @@ public class StateDirectory {
 
     private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
                                                 final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listTaskDirectories();
+        final File[] taskDirs = listAllTaskDirectories();
         if (taskDirs == null || taskDirs.length == 0) {
             return; // nothing to do
         }
@@ -278,61 +299,73 @@ public class StateDirectory {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
+                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
-                        if (now > lastModifiedMs + cleanupDelayMs || manualUserCall) {
-                            if (!manualUserCall) {
-                                log.info(
-                                    "{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
-                                    logPrefix(),
-                                    dirName,
-                                    id,
-                                    now - lastModifiedMs,
-                                    cleanupDelayMs);
-                            } else {
-                                log.info(
-                                        "{} Deleting state directory {} for task {} as user calling cleanup.",
-                                        logPrefix(),
-                                        dirName,
-                                        id);
-                            }
-                            Utils.delete(taskDir);
+                        if (now > lastModifiedMs + cleanupDelayMs) {
+                            log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
+                                logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
+
+                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                        } else if (manualUserCall) {
+                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
+                                logPrefix(), dirName, id);
+
+                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException e) {
-                    // locked by another thread
-                    if (manualUserCall) {
-                        log.error("{} Failed to get the state directory lock.", logPrefix(), e);
-                        throw e;
-                    }
-                } catch (final IOException e) {
-                    log.error("{} Failed to delete the state directory.", logPrefix(), e);
-                    if (manualUserCall) {
-                        throw e;
-                    }
+                } catch (final OverlappingFileLockException | IOException e) {
+                    exception = e;
                 } finally {
                     try {
                         unlock(id);
-                    } catch (final IOException e) {
-                        log.error("{} Failed to release the state directory lock.", logPrefix());
+
+                        // for manual user call, stream threads are not running so it is safe to delete
+                        // the whole directory
                         if (manualUserCall) {
-                            throw e;
+                            Utils.delete(taskDir);
                         }
+                    } catch (final IOException e) {
+                        exception = e;
                     }
                 }
+
+                if (exception != null && manualUserCall) {
+                    log.error("{} Failed to release the state directory lock.", logPrefix());
+                    throw exception;
+                }
             }
         }
     }
 
     /**
+     * List all of the task directories that are non-empty
+     * @return The list of all the non-empty local directories for stream tasks
+     */
+    File[] listNonEmptyTaskDirectories() {
+        final File[] taskDirectories = !stateDir.exists() ? new File[0] :
+            stateDir.listFiles(pathname -> {
+                if (!pathname.isDirectory() || !PATH_NAME.matcher(pathname.getName()).matches()) {
+                    return false;
+                } else {
+                    return !taskDirEmpty(pathname);
+                }
+            });
+
+        return taskDirectories == null ? new File[0] : taskDirectories;
+    }
+
+    /**
      * List all of the task directories
      * @return The list of all the existing local directories for stream tasks
      */
-    File[] listTaskDirectories() {
-        return !stateDir.exists() ? new File[0] :
-                stateDir.listFiles(pathname -> pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches());
+    File[] listAllTaskDirectories() {
+        final File[] taskDirectories = !stateDir.exists() ? new File[0] :
+            stateDir.listFiles(pathname -> pathname.isDirectory() && PATH_NAME.matcher(pathname.getName()).matches());
+
+        return taskDirectories == null ? new File[0] : taskDirectories;
     }
 
     private FileChannel getOrCreateFileChannel(final TaskId taskId,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9385ca1..21fd96c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -182,7 +182,7 @@ public class TaskManager {
 
         final HashSet<TaskId> tasks = new HashSet<>();
 
-        final File[] stateDirs = taskCreator.stateDirectory().listTaskDirectories();
+        final File[] stateDirs = taskCreator.stateDirectory().listNonEmptyTaskDirectories();
         if (stateDirs != null) {
             for (final File dir : stateDirs) {
                 try {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 1f7163f..bfcaa44 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -32,6 +32,7 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
@@ -204,18 +205,29 @@ public class StateDirectoryTest {
     public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws Exception {
         final TaskId task0 = new TaskId(0, 0);
         final TaskId task1 = new TaskId(1, 0);
+        final TaskId task2 = new TaskId(2, 0);
         try {
+            assertTrue(new File(directory.directoryForTask(task0), "store").mkdir());
+            assertTrue(new File(directory.directoryForTask(task1), "store").mkdir());
+            assertTrue(new File(directory.directoryForTask(task2), "store").mkdir());
+
             directory.lock(task0);
             directory.lock(task1);
-            directory.directoryForTask(new TaskId(2, 0));
 
-            List<File> files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
+            List<File> files = Arrays.asList(Objects.requireNonNull(directory.listAllTaskDirectories()));
+            assertEquals(3, files.size());
+
+
+            files = Arrays.asList(Objects.requireNonNull(directory.listNonEmptyTaskDirectories()));
             assertEquals(3, files.size());
 
-            time.sleep(1000);
+            time.sleep(5000);
             directory.cleanRemovedTasks(0);
 
-            files = Arrays.asList(Objects.requireNonNull(appDir.listFiles()));
+            files = Arrays.asList(Objects.requireNonNull(directory.listAllTaskDirectories()));
+            assertEquals(3, files.size());
+
+            files = Arrays.asList(Objects.requireNonNull(directory.listNonEmptyTaskDirectories()));
             assertEquals(2, files.size());
             assertTrue(files.contains(new File(appDir, task0.toString())));
             assertTrue(files.contains(new File(appDir, task1.toString())));
@@ -228,13 +240,19 @@ public class StateDirectoryTest {
     @Test
     public void shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusCleanupDelay() {
         final File dir = directory.directoryForTask(new TaskId(2, 0));
+        assertTrue(new File(dir, "store").mkdir());
+
         final int cleanupDelayMs = 60000;
         directory.cleanRemovedTasks(cleanupDelayMs);
         assertTrue(dir.exists());
+        assertEquals(1, directory.listAllTaskDirectories().length);
+        assertEquals(1, directory.listNonEmptyTaskDirectories().length);
 
         time.sleep(cleanupDelayMs + 1000);
         directory.cleanRemovedTasks(cleanupDelayMs);
-        assertFalse(dir.exists());
+        assertTrue(dir.exists());
+        assertEquals(1, directory.listAllTaskDirectories().length);
+        assertEquals(0, directory.listNonEmptyTaskDirectories().length);
     }
 
     @Test
@@ -245,15 +263,21 @@ public class StateDirectoryTest {
     }
 
     @Test
-    public void shouldListAllTaskDirectories() {
+    public void shouldOnlyListNonEmptyTaskDirectories() {
         TestUtils.tempDirectory(stateDir.toPath(), "foo");
         final File taskDir1 = directory.directoryForTask(new TaskId(0, 0));
         final File taskDir2 = directory.directoryForTask(new TaskId(0, 1));
 
-        final List<File> dirs = Arrays.asList(directory.listTaskDirectories());
-        assertEquals(2, dirs.size());
-        assertTrue(dirs.contains(taskDir1));
-        assertTrue(dirs.contains(taskDir2));
+        final File storeDir = new File(taskDir1, "store");
+        assertTrue(storeDir.mkdir());
+
+        assertEquals(Arrays.asList(taskDir1, taskDir2), Arrays.asList(directory.listAllTaskDirectories()));
+        assertEquals(Collections.singletonList(taskDir1), Arrays.asList(directory.listNonEmptyTaskDirectories()));
+
+        directory.cleanRemovedTasks(0L);
+
+        assertEquals(Arrays.asList(taskDir1, taskDir2), Arrays.asList(directory.listAllTaskDirectories()));
+        assertEquals(Collections.emptyList(), Arrays.asList(directory.listNonEmptyTaskDirectories()));
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index b95cec3..bc2f319 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -222,7 +222,7 @@ public class TaskManagerTest {
         assertTrue((new File(taskFolders[3], StateManagerUtil.CHECKPOINT_FILE_NAME)).createNewFile());
 
         expect(activeTaskCreator.stateDirectory()).andReturn(stateDirectory).once();
-        expect(stateDirectory.listTaskDirectories()).andReturn(taskFolders).once();
+        expect(stateDirectory.listNonEmptyTaskDirectories()).andReturn(taskFolders).once();
 
         EasyMock.replay(activeTaskCreator, stateDirectory);
 
@@ -672,7 +672,6 @@ public class TaskManagerTest {
         expect(activeTaskCreator.createTasks(EasyMock.anyObject(),
                                                   EasyMock.eq(taskId0Assignment)))
                 .andReturn(Collections.singletonList(streamTask));
-
     }
 
     private void mockTopologyBuilder() {