You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/18 17:57:44 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #9262: MINOR: Fix log message when tasks directory is cleaned manually

mjsax commented on a change in pull request #9262:
URL: https://github.com/apache/kafka/pull/9262#discussion_r491102161



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -292,7 +291,7 @@ public synchronized void clean() {
             }
         } catch (final IOException e) {
             log.error("{} Failed to delete global state directory of {} due to an unexpected exception",
-                appId, logPrefix(), e);
+                logPrefix(), appId, e);

Review comment:
       Nice catch!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {

Review comment:
       I this save? I _think_ if `listAllTaskDirectories()` returns `null` we would get an NPE?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {

Review comment:
       Could we not fix this by flipping the logic to:
   ```
   if (manualUserCall) {
   } else if (now > lastModifiedMs + cleanupDelayMs) {
   }
   ```
   
   To avoid code duplication?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not running so it is safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn("{} Swallowed the following exception during unlocking after " +
+                                "deletion of obsolete state directory for task {}: {}",
+                            logPrefix(), dirName, exception);
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {

Review comment:
       As above.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not running so it is safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn("{} Swallowed the following exception during unlocking after " +
+                                "deletion of obsolete state directory for task {}: {}",
+                            logPrefix(), dirName, exception);
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
+                            logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                    }

Review comment:
       Should we log a message that we could not get the lock? -- For a manual user call, this might be good to know?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -306,59 +305,69 @@ public synchronized void clean() {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = taskDir.lastModified();
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn("{} Swallowed the following exception during deletion of obsolete state directory {} for task {}: {}",
+                        logPrefix(), dirName, id, exception);
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not running so it is safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn("{} Swallowed the following exception during unlocking after " +
+                                "deletion of obsolete state directory for task {}: {}",
+                            logPrefix(), dirName, exception);
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
+                            logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                    }
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.error("{} Failed to delete state directory {} for task {} with exception: {}",
+                        logPrefix(), dirName, id, exception);
                     throw exception;
+                } finally {
+                    try {
+                        unlock(id);
+                        // for manual user call, stream threads are not running so it is safe to delete
+                        // the whole directory
+                        Utils.delete(taskDir);
+                    } catch (final IOException exception) {
+                        log.error("{} Failed to release lock on state directory {} for task {} with exception: {}",
+                            logPrefix(), dirName, id, exception);
+                        throw exception;

Review comment:
       Might be better to use an `Exception` variable `firstException` and rethrow at the end if not `null` -- IIRC, behavior is undefined if we throw a second exception (ie, `finally` would executed after the first (outer) `catch` block.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org