You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/29 19:20:38 UTC
kafka git commit: KAFKA-6259: Make KafkaStreams.cleanup() clean
global state directory
Repository: kafka
Updated Branches:
refs/heads/trunk 0cc32abc1 -> b512cd474
KAFKA-6259: Make KafkaStreams.cleanup() clean global state directory
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #4255 from mjsax/kafka-6259-clean-global-state-dir
add log4j entry
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b512cd47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b512cd47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b512cd47
Branch: refs/heads/trunk
Commit: b512cd474c6ebfbbbe0192764cec3413a94baba0
Parents: 0cc32ab
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 29 11:18:31 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 29 11:20:29 2017 -0800
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 3 +-
.../processor/internals/StateDirectory.java | 73 ++++++++++++++++----
.../processor/internals/StateDirectoryTest.java | 19 ++++-
3 files changed, 81 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b512cd47/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c7dfe71..1844cde 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -892,12 +892,13 @@ public class KafkaStreams {
* Calling this method triggers a restore of local {@link StateStore}s on the next {@link #start() application start}.
*
* @throws IllegalStateException if this {@code KafkaStreams} instance is currently {@link State#RUNNING running}
+ * @throws StreamsException if cleanup failed
*/
public void cleanUp() {
if (isRunning()) {
throw new IllegalStateException("Cannot clean up while running.");
}
- stateDirectory.cleanRemovedTasks(0);
+ stateDirectory.clean();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/b512cd47/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 1bfe98c..c33ade6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -222,6 +223,21 @@ public class StateDirectory {
}
}
+ public synchronized void clean() {
+ try {
+ cleanRemovedTasks(0, true);
+ } catch (final Exception e) {
+ // this is already logged within cleanRemovedTasks
+ throw new StreamsException(e);
+ }
+ try {
+ Utils.delete(globalStateDir().getAbsoluteFile());
+ } catch (final IOException e) {
+ log.error("{} Failed to delete global state directory due to an unexpected exception", logPrefix(), e);
+ throw new StreamsException(e);
+ }
+ }
+
/**
* Remove the directories for any {@link TaskId}s that are no-longer
* owned by this {@link StreamThread} and aren't locked by either
@@ -230,37 +246,70 @@ public class StateDirectory {
* this amount of time (milliseconds)
*/
public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
+ try {
+ cleanRemovedTasks(cleanupDelayMs, false);
+ } catch (final Exception cannotHappen) {
+ throw new IllegalStateException("Should have swallowed exception.", cannotHappen);
+ }
+ }
+
+ private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
+ final boolean manualUserCall) throws Exception {
final File[] taskDirs = listTaskDirectories();
if (taskDirs == null || taskDirs.length == 0) {
return; // nothing to do
}
- for (File taskDir : taskDirs) {
+
+ for (final File taskDir : taskDirs) {
final String dirName = taskDir.getName();
- TaskId id = TaskId.parse(dirName);
+ final TaskId id = TaskId.parse(dirName);
if (!locks.containsKey(id)) {
try {
if (lock(id)) {
- long now = time.milliseconds();
- long lastModifiedMs = taskDir.lastModified();
- if (now > lastModifiedMs + cleanupDelayMs) {
- log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs);
+ final long now = time.milliseconds();
+ final long lastModifiedMs = taskDir.lastModified();
+ if (now > lastModifiedMs + cleanupDelayMs || manualUserCall) {
+ if (!manualUserCall) {
+ log.info(
+ "{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).",
+ logPrefix(),
+ dirName,
+ id,
+ now - lastModifiedMs,
+ cleanupDelayMs);
+ } else {
+ log.info(
+ "{} Deleting state directory {} for task {} as user calling cleanup.",
+ logPrefix(),
+ dirName,
+ id);
+ }
Utils.delete(taskDir);
}
}
- } catch (OverlappingFileLockException e) {
+ } catch (final OverlappingFileLockException e) {
// locked by another thread
- } catch (IOException e) {
- log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix(), e);
+ if (manualUserCall) {
+ log.error("{} Failed to get the state directory lock.", logPrefix(), e);
+ throw e;
+ }
+ } catch (final IOException e) {
+ log.error("{} Failed to delete the state directory.", logPrefix(), e);
+ if (manualUserCall) {
+ throw e;
+ }
} finally {
try {
unlock(id);
- } catch (IOException e) {
- log.error("{} Failed to release the state directory lock", logPrefix());
+ } catch (final IOException e) {
+ log.error("{} Failed to release the state directory lock.", logPrefix());
+ if (manualUserCall) {
+ throw e;
+ }
}
}
}
}
-
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/b512cd47/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 1a5d46d..e14d010 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -195,9 +195,13 @@ public class StateDirectoryTest {
directory.lock(task1);
directory.directoryForTask(new TaskId(2, 0));
+ List<File> files = Arrays.asList(appDir.listFiles());
+ assertEquals(3, files.size());
+
time.sleep(1000);
directory.cleanRemovedTasks(0);
- final List<File> files = Arrays.asList(appDir.listFiles());
+
+ files = Arrays.asList(appDir.listFiles());
assertEquals(2, files.size());
assertTrue(files.contains(new File(appDir, task0.toString())));
assertTrue(files.contains(new File(appDir, task1.toString())));
@@ -341,4 +345,17 @@ public class StateDirectoryTest {
assertTrue(directory.lock(taskId));
}
+ @Test
+ public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() {
+ directory.directoryForTask(new TaskId(1, 0));
+ directory.globalStateDir();
+
+ List<File> files = Arrays.asList(appDir.listFiles());
+ assertEquals(2, files.size());
+
+ directory.clean();
+
+ files = Arrays.asList(appDir.listFiles());
+ assertEquals(0, files.size());
+ }
}
\ No newline at end of file