You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/29 19:20:38 UTC

kafka git commit: KAFKA-6259: Make KafkaStreams.cleanup() clean global state directory

Repository: kafka
Updated Branches:
  refs/heads/trunk 0cc32abc1 -> b512cd474


KAFKA-6259: Make KafkaStreams.cleanup() clean global state directory

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #4255 from mjsax/kafka-6259-clean-global-state-dir

add log4j entry


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b512cd47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b512cd47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b512cd47

Branch: refs/heads/trunk
Commit: b512cd474c6ebfbbbe0192764cec3413a94baba0
Parents: 0cc32ab
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 29 11:18:31 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 29 11:20:29 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  |  3 +-
 .../processor/internals/StateDirectory.java     | 73 ++++++++++++++++----
 .../processor/internals/StateDirectoryTest.java | 19 ++++-
 3 files changed, 81 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b512cd47/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c7dfe71..1844cde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -892,12 +892,13 @@ public class KafkaStreams {
      * Calling this method triggers a restore of local {@link StateStore}s on the next {@link #start() application start}.
      *
      * @throws IllegalStateException if this {@code KafkaStreams} instance is currently {@link State#RUNNING running}
+     * @throws StreamsException if cleanup failed
      */
     public void cleanUp() {
         if (isRunning()) {
             throw new IllegalStateException("Cannot clean up while running.");
         }
-        stateDirectory.cleanRemovedTasks(0);
+        stateDirectory.clean();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b512cd47/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 1bfe98c..c33ade6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.TaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -222,6 +223,21 @@ public class StateDirectory {
         }
     }
 
+    public synchronized void clean() {
+        try {
+            cleanRemovedTasks(0, true);
+        } catch (final Exception e) {
+            // this is already logged within cleanRemovedTasks
+            throw new StreamsException(e);
+        }
+        try {
+            Utils.delete(globalStateDir().getAbsoluteFile());
+        } catch (final IOException e) {
+            log.error("{} Failed to delete global state directory due to an unexpected exception", logPrefix(), e);
+            throw new StreamsException(e);
+        }
+    }
+
     /**
      * Remove the directories for any {@link TaskId}s that are no-longer
      * owned by this {@link StreamThread} and aren't locked by either
@@ -230,37 +246,70 @@ public class StateDirectory {
      *                       this amount of time (milliseconds)
      */
     public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
+        try {
+            cleanRemovedTasks(cleanupDelayMs, false);
+        } catch (final Exception cannotHappen) {
+            throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
+        }
+    }
+
+    private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
+                                                final boolean manualUserCall) throws Exception {
         final File[] taskDirs = listTaskDirectories();
         if (taskDirs == null || taskDirs.length == 0) {
             return; // nothing to do
         }
-        for (File taskDir : taskDirs) {
+
+        for (final File taskDir : taskDirs) {
             final String dirName = taskDir.getName();
-            TaskId id = TaskId.parse(dirName);
+            final TaskId id = TaskId.parse(dirName);
             if (!locks.containsKey(id)) {
                 try {
                     if (lock(id)) {
-                        long now = time.milliseconds();
-                        long lastModifiedMs = taskDir.lastModified();
-                        if (now > lastModifiedMs + cleanupDelayMs) {
-                            log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
+                        final long now = time.milliseconds();
+                        final long lastModifiedMs = taskDir.lastModified();
+                        if (now > lastModifiedMs + cleanupDelayMs || manualUserCall) {
+                            if (!manualUserCall) {
+                                log.info(
+                                    "{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
+                                    logPrefix(),
+                                    dirName,
+                                    id,
+                                    now - lastModifiedMs,
+                                    cleanupDelayMs);
+                            } else {
+                                log.info(
+                                        "{} Deleting state directory {} for task {} as user calling cleanup.",
+                                        logPrefix(),
+                                        dirName,
+                                        id);
+                            }
                             Utils.delete(taskDir);
                         }
                     }
-                } catch (OverlappingFileLockException e) {
+                } catch (final OverlappingFileLockException e) {
                     // locked by another thread
-                } catch (IOException e) {
-                    log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix(), e);
+                    if (manualUserCall) {
+                        log.error("{} Failed to get the state directory lock.", logPrefix(), e);
+                        throw e;
+                    }
+                } catch (final IOException e) {
+                    log.error("{} Failed to delete the state directory.", logPrefix(), e);
+                    if (manualUserCall) {
+                        throw e;
+                    }
                 } finally {
                     try {
                         unlock(id);
-                    } catch (IOException e) {
-                        log.error("{} Failed to release the state directory lock", logPrefix());
+                    } catch (final IOException e) {
+                        log.error("{} Failed to release the state directory lock.", logPrefix());
+                        if (manualUserCall) {
+                            throw e;
+                        }
                     }
                 }
             }
         }
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/b512cd47/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 1a5d46d..e14d010 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -195,9 +195,13 @@ public class StateDirectoryTest {
             directory.lock(task1);
             directory.directoryForTask(new TaskId(2, 0));
 
+            List<File> files = Arrays.asList(appDir.listFiles());
+            assertEquals(3, files.size());
+
             time.sleep(1000);
             directory.cleanRemovedTasks(0);
-            final List<File> files = Arrays.asList(appDir.listFiles());
+
+            files = Arrays.asList(appDir.listFiles());
             assertEquals(2, files.size());
             assertTrue(files.contains(new File(appDir, task0.toString())));
             assertTrue(files.contains(new File(appDir, task1.toString())));
@@ -341,4 +345,17 @@ public class StateDirectoryTest {
         assertTrue(directory.lock(taskId));
     }
 
+    @Test
+    public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() {
+        directory.directoryForTask(new TaskId(1, 0));
+        directory.globalStateDir();
+
+        List<File> files = Arrays.asList(appDir.listFiles());
+        assertEquals(2, files.size());
+
+        directory.clean();
+
+        files = Arrays.asList(appDir.listFiles());
+        assertEquals(0, files.size());
+    }
 }
\ No newline at end of file