You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/10/08 19:42:56 UTC
[kafka] branch 2.7 updated: KAFKA-7334: Suggest changing config for
state.dir in case of FileNotFoundException (#9380)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 3d399a5 KAFKA-7334: Suggest changing config for state.dir in case of FileNotFoundException (#9380)
3d399a5 is described below
commit 3d399a51da3415ea3a58760402ba665950390732
Author: voffcheg109 <vo...@yandex.ru>
AuthorDate: Thu Oct 8 23:20:21 2020 +0400
KAFKA-7334: Suggest changing config for state.dir in case of FileNotFoundException (#9380)
Add additional warning logs and improve existing log messages for `FileNotFoundException` and if /tmp is used as state directory.
Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
.../kafka/streams/processor/internals/GlobalStateManagerImpl.java | 4 +++-
.../kafka/streams/processor/internals/ProcessorStateManager.java | 6 +++++-
.../apache/kafka/streams/processor/internals/StateDirectory.java | 4 ++++
.../streams/processor/internals/ProcessorStateManagerTest.java | 5 ++++-
4 files changed, 16 insertions(+), 3 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index bd3aa3a..2c57a0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -446,7 +446,9 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
try {
checkpointFile.write(filteredOffsets);
} catch (final IOException e) {
- log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpointFile, e);
+ log.warn("Failed to write offset checkpoint file to {} for global stores: {}." +
+ " This may occur if OS cleaned the state.dir in case when it is located in the (default) /tmp/kafka-streams directory." +
+ " Changing the location of state.dir may resolve the problem", checkpointFile, e);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 58a21fc..3dcdaaf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -603,7 +603,11 @@ public class ProcessorStateManager implements StateManager {
try {
checkpointFile.write(checkpointingOffsets);
} catch (final IOException e) {
- log.warn("Failed to write offset checkpoint file to [{}]", checkpointFile, e);
+ log.warn("Failed to write offset checkpoint file to [{}]." +
+ " This may occur if OS cleaned the state.dir in case when it located in /tmp directory." +
+ " This may also occur due to running multiple instances on the same machine using the same state dir." +
+ " Changing the location of state.dir may resolve the problem.",
+ checkpointFile, e);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 35f937a..861a971 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -98,6 +98,10 @@ public class StateDirectory {
throw new ProcessorStateException(
String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
}
+ if (hasPersistentStores && stateDirName.startsWith("/tmp")) {
+ log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file" +
+ " due to the fact that this directory can be cleared by the OS");
+ }
}
/**
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index a652b17..4f077a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -776,7 +776,10 @@ public class ProcessorStateManagerTest {
for (final LogCaptureAppender.Event event : appender.getEvents()) {
if ("WARN".equals(event.getLevel())
&& event.getMessage().startsWith("process-state-manager-test Failed to write offset checkpoint file to [")
- && event.getMessage().endsWith(".checkpoint]")
+ && event.getMessage().endsWith(".checkpoint]." +
+ " This may occur if OS cleaned the state.dir in case when it located in /tmp directory." +
+ " This may also occur due to running multiple instances on the same machine using the same state dir." +
+ " Changing the location of state.dir may resolve the problem.")
&& event.getThrowableInfo().get().startsWith("java.io.FileNotFoundException: ")) {
foundExpectedLogMessage = true;