You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/22 20:49:39 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

guozhangwang commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r599057582



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -410,8 +374,8 @@ public void close() {
             }
 
             // all threads should be stopped and cleaned up by now, so none should remain holding a lock
-            if (locks.isEmpty()) {
-                log.error("Some task directories still locked while closing state, this indicates unclean shutdown: {}", locks);
+            if (lockedTasksToStreamThreadOwner.isEmpty()) {

Review comment:
       Hmm.. should that actually be `!isEmpty())`? Were we actually always print this error message spuriously?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -444,22 +408,30 @@ public synchronized void clean() {
      * Remove the directories for any {@link TaskId}s that are no-longer
      * owned by this {@link StreamThread} and aren't locked by either
      * another process or another {@link StreamThread}
-     * @param cleanupDelayMs only remove directories if they haven't been modified for at least
-     *                       this amount of time (milliseconds)
+     * @param cleanupDelayMs        only remove directories if they haven't been modified for at least
+     *                              this amount of time (milliseconds)
+     * @param currentThreadNames    the names of all non-DEAD stream threads so we can clean up any
+     *                              orphaned task directories
      */
-    public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
+    public synchronized void cleanRemovedTasks(final long cleanupDelayMs, final Set<String> currentThreadNames) {
         try {
-            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs, currentThreadNames);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs, final Set<String> currentThreadNames) {
         for (final File taskDir : listNonEmptyTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
-            if (!locks.containsKey(id)) {
+
+            final String owningThread = lockedTasksToStreamThreadOwner.get(id);
+            if (owningThread != null && !currentThreadNames.contains(owningThread)) {
+                log.warn("Deleting lock for task directory {} since the thread owning the lock is gone: {}", id, owningThread);

Review comment:
       When could this scenario happen? If it should never happen shall we throw?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org