You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/10/08 19:42:56 UTC

[kafka] branch 2.7 updated: KAFKA-7334: Suggest changing config for state.dir in case of FileNotFoundException (#9380)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 3d399a5  KAFKA-7334: Suggest changing config for state.dir in case of FileNotFoundException (#9380)
3d399a5 is described below

commit 3d399a51da3415ea3a58760402ba665950390732
Author: voffcheg109 <vo...@yandex.ru>
AuthorDate: Thu Oct 8 23:20:21 2020 +0400

    KAFKA-7334: Suggest changing config for state.dir in case of FileNotFoundException (#9380)
    
    Add additional warning logs and improve existing log messages for `FileNotFoundException` and if /tmp is used as state directory.
    
    Reviewers: A. Sophie Blee-Goldman <so...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../kafka/streams/processor/internals/GlobalStateManagerImpl.java   | 4 +++-
 .../kafka/streams/processor/internals/ProcessorStateManager.java    | 6 +++++-
 .../apache/kafka/streams/processor/internals/StateDirectory.java    | 4 ++++
 .../streams/processor/internals/ProcessorStateManagerTest.java      | 5 ++++-
 4 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index bd3aa3a..2c57a0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -446,7 +446,9 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
         try {
             checkpointFile.write(filteredOffsets);
         } catch (final IOException e) {
-            log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpointFile, e);
+            log.warn("Failed to write offset checkpoint file to {} for global stores: {}." +
+                " This may occur if OS cleaned the state.dir in case when it is located in the (default) /tmp/kafka-streams directory." +
+                " Changing the location of state.dir may resolve the problem", checkpointFile, e);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 58a21fc..3dcdaaf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -603,7 +603,11 @@ public class ProcessorStateManager implements StateManager {
         try {
             checkpointFile.write(checkpointingOffsets);
         } catch (final IOException e) {
-            log.warn("Failed to write offset checkpoint file to [{}]", checkpointFile, e);
+            log.warn("Failed to write offset checkpoint file to [{}]." +
+                " This may occur if OS cleaned the state.dir in case when it located in /tmp directory." +
+                " This may also occur due to running multiple instances on the same machine using the same state dir." +
+                " Changing the location of state.dir may resolve the problem.",
+                checkpointFile, e);
         }
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 35f937a..861a971 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -98,6 +98,10 @@ public class StateDirectory {
             throw new ProcessorStateException(
                 String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
         }
+        if (hasPersistentStores && stateDirName.startsWith("/tmp")) {
+            log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file" +
+                " due to the fact that this directory can be cleared by the OS");
+        }
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index a652b17..4f077a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -776,7 +776,10 @@ public class ProcessorStateManagerTest {
             for (final LogCaptureAppender.Event event : appender.getEvents()) {
                 if ("WARN".equals(event.getLevel())
                     && event.getMessage().startsWith("process-state-manager-test Failed to write offset checkpoint file to [")
-                    && event.getMessage().endsWith(".checkpoint]")
+                    && event.getMessage().endsWith(".checkpoint]." +
+                        " This may occur if OS cleaned the state.dir in case when it located in /tmp directory." +
+                        " This may also occur due to running multiple instances on the same machine using the same state dir." +
+                        " Changing the location of state.dir may resolve the problem.")
                     && event.getThrowableInfo().get().startsWith("java.io.FileNotFoundException: ")) {
 
                     foundExpectedLogMessage = true;