You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2020/10/05 20:15:17 UTC

[kafka] branch 2.6 updated (4576fdc -> 8d5c15a)

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a change to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 4576fdc  KAFKA-10531: Check for negative values to Thread.sleep call (#9347)
     new 7566deb  MINOR: Fix log message when tasks directory is cleaned manually (#9262)
     new 8d5c15a  KAFKA-10514: Advance mock time for state directory cleanup (#9323)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../processor/internals/StateDirectory.java        | 94 ++++++++++++++--------
 .../processor/internals/StateDirectoryTest.java    | 38 +++++++++
 2 files changed, 98 insertions(+), 34 deletions(-)


[kafka] 01/02: MINOR: Fix log message when tasks directory is cleaned manually (#9262)

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7566deb19932509d5be82672e426dec6a75894c5
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Tue Sep 22 18:12:25 2020 +0200

    MINOR: Fix log message when tasks directory is cleaned manually (#9262)
    
    Currently when a task directory is cleaned manually the message
    for the state dir cleaner is logged instead of the message for
    the manual cleanup. This is because the code checks the elapsed
    time since the last update before it checks whether the cleanup
    is a manual call. This commit changes the order of the checks.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>, Matthias J. Sax <mj...@apache.org>, Walker Carlson <wc...@confluent.io>, John Roesler <vv...@apache.org>
---
 .../processor/internals/StateDirectory.java        | 94 ++++++++++++++--------
 .../processor/internals/StateDirectoryTest.java    | 37 +++++++++
 2 files changed, 97 insertions(+), 34 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 30dd2ca..de9fbea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -280,9 +280,8 @@ public class StateDirectory {
     public synchronized void clean() {
         // remove task dirs
         try {
-            cleanRemovedTasks(0, true);
+            cleanRemovedTasksCalledByUser();
         } catch (final Exception e) {
-            // this is already logged within cleanRemovedTasks
             throw new StreamsException(e);
         }
         // remove global dir
@@ -290,10 +289,13 @@ public class StateDirectory {
             if (stateDir.exists()) {
                 Utils.delete(globalStateDir().getAbsoluteFile());
             }
-        } catch (final IOException e) {
-            log.error("{} Failed to delete global state directory of {} due to an unexpected exception",
-                appId, logPrefix(), e);
-            throw new StreamsException(e);
+        } catch (final IOException exception) {
+            log.error(
+                String.format("%s Failed to delete global state directory of %s due to an unexpected exception",
+                    logPrefix(), appId),
+                exception
+            );
+            throw new StreamsException(exception);
         }
     }
 
@@ -306,24 +308,17 @@ public class StateDirectory {
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
         try {
-            cleanRemovedTasks(cleanupDelayMs, false);
+            cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
         } catch (final Exception cannotHappen) {
             throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
         }
     }
 
-    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
-                                                final boolean manualUserCall) throws Exception {
-        final File[] taskDirs = listAllTaskDirectories();
-        if (taskDirs == null || taskDirs.length == 0) {
-            return; // nothing to do
-        }
-
-        for (final File taskDir : taskDirs) {
+    private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
+        for (final File taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.getName();
             final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
-                Exception exception = null;
                 try {
                     if (lock(id)) {
                         final long now = time.milliseconds();
@@ -331,34 +326,65 @@ public class StateDirectory {
                         if (now > lastModifiedMs + cleanupDelayMs) {
                             log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
-
-                            Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
-                        } else if (manualUserCall) {
-                            log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
-                                logPrefix(), dirName, id);
-
                             Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
                         }
                     }
-                } catch (final OverlappingFileLockException | IOException e) {
-                    exception = e;
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.warn(
+                        String.format("%s Swallowed the following exception during deletion of obsolete state directory %s for task %s:",
+                            logPrefix(), dirName, id),
+                        exception
+                    );
                 } finally {
                     try {
                         unlock(id);
-
-                        // for manual user call, stream threads are not running so it is safe to delete
-                        // the whole directory
-                        if (manualUserCall) {
-                            Utils.delete(taskDir);
-                        }
-                    } catch (final IOException e) {
-                        exception = e;
+                    } catch (final IOException exception) {
+                        log.warn(
+                            String.format("%s Swallowed the following exception during unlocking after deletion of obsolete " +
+                                "state directory %s for task %s:", logPrefix(), dirName, id),
+                            exception
+                        );
                     }
                 }
+            }
+        }
+    }
 
-                if (exception != null && manualUserCall) {
-                    log.error("{} Failed to release the state directory lock.", logPrefix());
+    private void cleanRemovedTasksCalledByUser() throws Exception {
+        for (final File taskDir : listAllTaskDirectories()) {
+            final String dirName = taskDir.getName();
+            final TaskId id = TaskId.parse(dirName);
+            if (!locks.containsKey(id)) {
+                try {
+                    if (lock(id)) {
+                        log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
+                            logPrefix(), dirName, id);
+                        Utils.delete(taskDir, Collections.singletonList(new File(taskDir, LOCK_FILE_NAME)));
+                    } else {
+                        log.warn("{} Could not get lock for state directory {} for task {} as user calling cleanup.",
+                            logPrefix(), dirName, id);
+                    }
+                } catch (final OverlappingFileLockException | IOException exception) {
+                    log.error(
+                        String.format("%s Failed to delete state directory %s for task %s with exception:",
+                            logPrefix(), dirName, id),
+                        exception
+                    );
                     throw exception;
+                } finally {
+                    try {
+                        unlock(id);
+                        // for manual user call, stream threads are not running so it is safe to delete
+                        // the whole directory
+                        Utils.delete(taskDir);
+                    } catch (final IOException exception) {
+                        log.error(
+                            String.format("%s Failed to release lock on state directory %s for task %s with exception:",
+                                logPrefix(), dirName, id),
+                            exception
+                        );
+                        throw exception;
+                    }
                 }
             }
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 1645ea8..5c109dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -49,6 +50,10 @@ import java.util.stream.Collectors;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.processor.internals.StateDirectory.LOCK_FILE_NAME;
 import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
+import static org.hamcrest.CoreMatchers.endsWith;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -533,6 +538,38 @@ public class StateDirectoryTest {
         assertTrue(runner.taskDirectory.isDirectory());
     }
 
+    @Test
+    public void shouldLogManualUserCallMessage() {
+        final TaskId taskId = new TaskId(0, 0);
+        final File taskDirectory = directory.directoryForTask(taskId);
+        final File testFile = new File(taskDirectory, "testFile");
+        assertThat(testFile.mkdir(), is(true));
+        assertThat(directory.directoryForTaskIsEmpty(taskId), is(false));
+
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+            directory.clean();
+            assertThat(
+                appender.getMessages(),
+                hasItem(endsWith("as user calling cleanup."))
+            );
+        }
+    }
+
+    @Test
+    public void shouldLogStateDirCleanerMessage() {
+        final TaskId taskId = new TaskId(0, 0);
+        final File taskDirectory = directory.directoryForTask(taskId);
+        final File testFile = new File(taskDirectory, "testFile");
+        assertThat(testFile.mkdir(), is(true));
+        assertThat(directory.directoryForTaskIsEmpty(taskId), is(false));
+
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+            final long cleanupDelayMs = 0;
+            directory.cleanRemovedTasks(cleanupDelayMs);
+            assertThat(appender.getMessages(), hasItem(endsWith("ms has elapsed (cleanup delay is " +  cleanupDelayMs + "ms).")));
+        }
+    }
+
     private static class CreateTaskDirRunner implements Runnable {
         private final StateDirectory directory;
         private final TaskId taskId;


[kafka] 02/02: KAFKA-10514: Advance mock time for state directory cleanup (#9323)

Posted by vv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8d5c15aff6398e3a91a2070404267e4a2b0ab37c
Author: Bruno Cadonna <br...@confluent.io>
AuthorDate: Wed Sep 23 17:02:32 2020 +0200

    KAFKA-10514: Advance mock time for state directory cleanup (#9323)
    
    The test did not update the MockTime, so the state directory cleaner couldn't run.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, John Roesler <vv...@apache.org>
---
 .../org/apache/kafka/streams/processor/internals/StateDirectoryTest.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 5c109dc..cb98789 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -565,6 +565,7 @@ public class StateDirectoryTest {
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
             final long cleanupDelayMs = 0;
+            time.sleep(5000);
             directory.cleanRemovedTasks(cleanupDelayMs);
             assertThat(appender.getMessages(), hasItem(endsWith("ms has elapsed (cleanup delay is " +  cleanupDelayMs + "ms).")));
         }