You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/08 10:11:06 UTC

[GitHub] [kafka] cadonna opened a new pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

cadonna opened a new pull request #9262:
URL: https://github.com/apache/kafka/pull/9262


   Currently when a task directory is cleaned manually the message
   for the state dir cleaner is logged instead of the message for
   the manual cleanup. This is because the code checks the elapsed
   time since the last update before it checks whether the cleanup
   is a manual call. This commit changes the order of the checks.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r488215737



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -328,15 +328,15 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
-                        if (now > lastModifiedMs + cleanupDelayMs) {
-                            log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
-                                logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
+                        if (manualUserCall) {
                             log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
                                 logPrefix(), dirName, id);
 
+                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));

Review comment:
       duplicate line of 340, can probably pull out of the if. Then there should be no need to make a separate method as the only divergence is a log msg.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491888883



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);

Review comment:
       Good catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#issuecomment-696760191


   Test failure unrelated: `Build / JDK 11 / kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r489476268



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,40 +306,55 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
+                } catch (final OverlappingFileLockException | IOException e) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, e);
+                } finally {
+                    try {
+                        unlock(id);
+                    } catch (final IOException e) {
+                        log.warn("{} Swallowed the following exception during unlocking after " +
+                                "deletion of obsolete state directory for task {}: {}",
+                            logPrefix(), dirName, e);
+                    }
+                }
+            }
+        }
+    }
+
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                Exception exception = null;
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
+                                logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                    }
                 } catch (final OverlappingFileLockException | IOException e) {

Review comment:
       the method ```clean``` catches ```Exception``` already. Could we get rid of those try-catch statements? the code ```log.error("{} Failed to release the state directory lock.", logPrefix());``` can be moved to ```clean```. For example:
   ```java
       public synchronized void clean() {
           // remove task dirs
           try {
               cleanRemovedTasksCalledByUser();
           } catch (final Exception e) {
               log.error("{} Failed to release the state directory lock.", logPrefix());
               throw new StreamsException(e);
           }
   ```
   ```java
       private void cleanRemovedTasksCalledByUser() throws Exception {
           for (final File taskDir : listAllTaskDirectories()) {
               final String dirName = taskDir.getName();
               final TaskId id = TaskId.parse(dirName);
               if (!locks.containsKey(id) && lock(id)) {
                   try {
                       log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
                               logPrefix(), dirName, id);
                       Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                   } finally {
                       unlock(id);
                       // for manual user call, stream threads are not running so it is safe to delete
                       // the whole directory
                       Utils.delete(taskDir);
                   }
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -280,7 +280,7 @@ synchronized void unlock(final TaskId taskId) throws IOException {
     public synchronized void clean() {
         // remove task dirs
         try {
-            cleanRemovedTasks(0, true);
+            cleanRemovedTasksCalledByUser();
         } catch (final Exception e) {
             // this is already logged within cleanRemovedTasks

Review comment:
       cleanRemovedTasks -> cleanRemovedTasksCalledByUser




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9262:
URL: https://github.com/apache/kafka/pull/9262


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491864886



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {

Review comment:
       I saw the behavior @ableegoldman mentions and then I changed the code. So it is safe.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {

Review comment:
       I agree with @ableegoldman. Now the code is easier to follow. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not running so it is safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn("{} Swallowed the following exception during unlocking after " +
+                                "deletion of obsolete state directory for task {}: {}",
+                            logPrefix(), dirName, exception);
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
+                            logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                    }
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.error("{} Failed to delete state directory {} for task {} with exception: {}",
+                        logPrefix(), dirName, id, exception);
                     throw exception;
+                } finally {
+                    try {
+                        unlock(id);
+                        // for manual user call, stream threads are not running so it is safe to delete
+                        // the whole directory
+                        Utils.delete(taskDir);
+                    } catch (final IOException exception) {
+                        log.error("{} Failed to release lock on state directory {} for task {} with exception: {}",
+                            logPrefix(), dirName, id, exception);
+                        throw exception;

Review comment:
       According to the Java Language Specification, it is not undefined. See https://docs.oracle.com/javase/specs/jls/se8/html/jls-14.html#jls-14.20.2. There it says:
   
   > If the catch block completes abruptly for reason R, then the finally block is executed. Then there is a choice:
   > 
   >    - If the finally block completes normally, then the try statement completes abruptly for reason R.
   > 
   >    - If the finally block completes abruptly for reason S, then the try statement completes abruptly for reason S (and reason R is discarded).
   > 
   
   That means, the last thrown exception is propagated. That was also the behavior of the refactored code. Since the goal of this PR is actually just to fix the log message without changing the behavior, I would leave this change to a future PR if we feel that the behavior should change.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);

Review comment:
       Good catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#issuecomment-696760191


   Test failure unrelated: `Build / JDK 11 / kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r488977783



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,40 +306,52 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
-                                logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
+                                    logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
+                } catch (final OverlappingFileLockException | IOException e) {
+                    // do nothing
+                } finally {
+                    try {
+                        unlock(id);
+                    } catch (final IOException e) {
+                        // do nothing

Review comment:
       Can we at least log a warning with the exception we're swallowing? Same for the `catch (final OverlappingFileLockException | IOException e) ` above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r490405191



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -280,7 +280,7 @@ synchronized void unlock(final TaskId taskId) throws IOException {
     public synchronized void clean() {
         // remove task dirs
         try {
-            cleanRemovedTasks(0, true);
+            cleanRemovedTasksCalledByUser();
         } catch (final Exception e) {
             // this is already logged within cleanRemovedTasks

Review comment:
       The previous comment is invalid as the method "cleanRemovedTasks" is removed. The replacement is "cleanRemovedTasksCalledByUser"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r490189904



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,40 +306,55 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
+                } catch (final OverlappingFileLockException | IOException e) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, e);
+                } finally {
+                    try {
+                        unlock(id);
+                    } catch (final IOException e) {
+                        log.warn("{} Swallowed the following exception during unlocking after " +
+                                "deletion of obsolete state directory for task {}: {}",
+                            logPrefix(), dirName, e);
+                    }
+                }
+            }
+        }
+    }
+
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                Exception exception = null;
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
+                                logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                    }
                 } catch (final OverlappingFileLockException | IOException e) {

Review comment:
       Good observation! I applied some of your suggestions but left the `try-catch` statements since they allow to log more specific information about the encountered error.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -280,7 +280,7 @@ synchronized void unlock(final TaskId taskId) throws IOException {
     public synchronized void clean() {
         // remove task dirs
         try {
-            cleanRemovedTasks(0, true);
+            cleanRemovedTasksCalledByUser();
         } catch (final Exception e) {
             // this is already logged within cleanRemovedTasks

Review comment:
       I am afraid I do not understand this comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491102161



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -292,7 +291,7 @@ public synchronized void clean() {
             }
         } catch (final IOException e) {
             log.error("{} Failed to delete global state directory of {} due to an unexpected exception",
-                appId, logPrefix(), e);
+                logPrefix(), appId, e);

Review comment:
       Nice catch!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {

Review comment:
       I this save? I _think_ if `listAllTaskDirectories()` returns `null` we would get an NPE?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {

Review comment:
       Could we not fix this by flipping the logic to:
   ```
   if (manualUserCall) {
   } else if (now > lastModifiedMs + cleanupDelayMs) {
   }
   ```
   
   To avoid code duplication?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not running so it is safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn("{} Swallowed the following exception during unlocking after " +
+                                "deletion of obsolete state directory for task {}: {}",
+                            logPrefix(), dirName, exception);
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {

Review comment:
       As above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not running so it is safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn("{} Swallowed the following exception during unlocking after " +
+                                "deletion of obsolete state directory for task {}: {}",
+                            logPrefix(), dirName, exception);
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
+                            logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                    }

Review comment:
       Should we log a message that we could not get the lock? -- For a manual user call, this might be good to know?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not running so it is safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn("{} Swallowed the following exception during unlocking after " +
+                                "deletion of obsolete state directory for task {}: {}",
+                            logPrefix(), dirName, exception);
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
+                            logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                    }
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.error("{} Failed to delete state directory {} for task {} with exception: {}",
+                        logPrefix(), dirName, id, exception);
                     throw exception;
+                } finally {
+                    try {
+                        unlock(id);
+                        // for manual user call, stream threads are not running so it is safe to delete
+                        // the whole directory
+                        Utils.delete(taskDir);
+                    } catch (final IOException exception) {
+                        log.error("{} Failed to release lock on state directory {} for task {} with exception: {}",
+                            logPrefix(), dirName, id, exception);
+                        throw exception;

Review comment:
       Might be better to use an `Exception` variable `firstException` and rethrow at the end if not `null` -- IIRC, behavior is undefined if we throw a second exception (ie, `finally` would executed after the first (outer) `catch` block.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491085819



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -292,7 +291,7 @@ public synchronized void clean() {
             }
         } catch (final IOException e) {
             log.error("{} Failed to delete global state directory of {} due to an unexpected exception",
-                appId, logPrefix(), e);
+                logPrefix(), appId, e);

Review comment:
       Just noticed something else wrong with this message. `e` will get ignored. We should instead use `log.error(String.format("...", logPrefix(), appId), e)`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);

Review comment:
       This log will be incomplete. We report the exception as the cause:
   
   ```suggestion
                       log.warn(String.format("%s Swallowed the following exception during deletion of obsolete state directory %s for task %s",
                           logPrefix(), dirName, id), exception);
   ```
   
   This feedback applies to pretty much all the warn/err logs in this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r488578475



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -328,15 +328,15 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
-                        if (now > lastModifiedMs + cleanupDelayMs) {
-                            log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
-                                logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
+                        if (manualUserCall) {

Review comment:
       I think this is a good idea. I will split the method into two.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491107777



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {

Review comment:
       We recently "fixed" `listAllTaskDirectories` to guarantee that it never returns null. We just missed to update all the null checks when we did that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491880515



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not running so it is safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn("{} Swallowed the following exception during unlocking after " +
+                                "deletion of obsolete state directory for task {}: {}",
+                            logPrefix(), dirName, exception);
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
+                            logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                    }
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.error("{} Failed to delete state directory {} for task {} with exception: {}",
+                        logPrefix(), dirName, id, exception);
                     throw exception;
+                } finally {
+                    try {
+                        unlock(id);
+                        // for manual user call, stream threads are not running so it is safe to delete
+                        // the whole directory
+                        Utils.delete(taskDir);
+                    } catch (final IOException exception) {
+                        log.error("{} Failed to release lock on state directory {} for task {} with exception: {}",
+                            logPrefix(), dirName, id, exception);
+                        throw exception;

Review comment:
       According to the Java Language Specification, it is not undefined. See https://docs.oracle.com/javase/specs/jls/se8/html/jls-14.html#jls-14.20.2. There it says:
   
   > If the catch block completes abruptly for reason R, then the finally block is executed. Then there is a choice:
   > 
   >    - If the finally block completes normally, then the try statement completes abruptly for reason R.
   > 
   >    - If the finally block completes abruptly for reason S, then the try statement completes abruptly for reason S (and reason R is discarded).
   > 
   
   That means, the last thrown exception is propagated. That was also the behavior of the refactored code. Since the goal of this PR is actually just to fix the log message without changing the behavior, I would leave this change to a future PR if we feel that the behavior should change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#issuecomment-697177819


   @cadonna 
   
   ```
   org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage failed, log available in /home/chia7712/kafka/streams/build/reports/testOutput/org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage.test.stdout
   
   org.apache.kafka.streams.processor.internals.StateDirectoryTest > shouldLogStateDirCleanerMessage FAILED
       java.lang.AssertionError:
       Expected: a collection containing a string ending with "ms has elapsed (cleanup delay is 0ms)."
            but: was empty
           at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
           at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
           at org.apache.kafka.streams.processor.internals.StateDirectoryTest.shouldLogStateDirCleanerMessage(StateDirectoryTest.java:569)
   ```
   
   the new test introduced by this PR is failed. The failed test does not update the current time of ```MockTime``` so the state directory can't be cleanup. I have filed a ticket for that failed test (https://issues.apache.org/jira/browse/KAFKA-10514).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r488578142



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -328,15 +328,15 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
-                        if (now > lastModifiedMs + cleanupDelayMs) {
-                            log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
-                                logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
+                        if (manualUserCall) {
                             log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
                                 logPrefix(), dirName, id);
 
+                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));

Review comment:
       That is not entirely true, because the exception handling differs between user call and cleaner thread call. The cleaner thread call swallows all exceptions and the user call throws all exceptions.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491108548



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {

Review comment:
       That's what Bruno originally did, but the original `cleanRemovedTasks` method branched on the `manualUserCall` flag in several places and was pretty difficult to follow (imo). So (also imo) it's cleaner to split it up into two methods that make it clear what the expected behavior is in each case. Just my 2 cents




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#issuecomment-688768558


   Call for review: @guozhangwang @ableegoldman @mjsax @vvcephei @wcarlson5 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#issuecomment-694720082


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r485073167



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -328,15 +328,15 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
-                        if (now > lastModifiedMs + cleanupDelayMs) {
-                            log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
-                                logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
+                        if (manualUserCall) {

Review comment:
       Honestly it kind of seems like there is enough divergent logic to merit splitting this up into separate methods for the manual vs cleanup-delay cases. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491866714



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {

Review comment:
       I agree with @ableegoldman. Now the code is easier to follow. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#issuecomment-703867931


   Cherry-picked to 2.6


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #9262:
URL: https://github.com/apache/kafka/pull/9262


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491864886



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {

Review comment:
       I saw the behavior @ableegoldman mentions and then I changed the code. So it is safe.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org