You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/02/23 23:52:17 UTC

[kafka] branch 2.6 updated: KAFKA-12211: don't change perm for base/state dir when no persistent store

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 7944758  KAFKA-12211: don't change perm for base/state dir when no persistent store
7944758 is described below

commit 794475884d117aedefed1485567aa7821f7a3bfc
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Feb 23 15:50:47 2021 -0800

    KAFKA-12211: don't change perm for base/state dir when no persistent store
    
    If a user doesn't have Persistent Stores, we won't create base dir and state dir and should not try to set permissions on them.
    
    Reviewers: Walker Carlson <wc...@confluent.io>
---
 .../processor/internals/StateDirectory.java        | 32 ++++++++++++----------
 .../processor/internals/StateDirectoryTest.java    | 10 +++++--
 2 files changed, 25 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index fd6cf96..eca86c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -119,22 +119,26 @@ public class StateDirectory {
         this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
         final File baseDir = new File(stateDirName);
-        if (this.hasPersistentStores && !baseDir.exists() && !baseDir.mkdirs()) {
-            throw new ProcessorStateException(
-                String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
-        }
         stateDir = new File(baseDir, appId);
-        if (this.hasPersistentStores && !stateDir.exists() && !stateDir.mkdir()) {
-            throw new ProcessorStateException(
-                String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
-        }
-        
-        if (hasPersistentStores && stateDirName.startsWith("/tmp")) {
-            log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file" +
-                " due to the fact that this directory can be cleared by the OS");
+
+        if (this.hasPersistentStores) {
+            if (!baseDir.exists() && !baseDir.mkdirs()) {
+                throw new ProcessorStateException(
+                    String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
+            }
+
+            if (!stateDir.exists() && !stateDir.mkdir()) {
+                throw new ProcessorStateException(
+                    String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
+            }
+
+            if (stateDirName.startsWith("/tmp")) {
+                log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file" +
+                             " due to the fact that this directory can be cleared by the OS");
+            }
+            configurePermissions(baseDir);
+            configurePermissions(stateDir);
         }
-        configurePermissions(baseDir);
-        configurePermissions(stateDir);
     }
     
     private void configurePermissions(final File file) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 200a2f7..2439981 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -560,9 +560,13 @@ public class StateDirectoryTest {
 
     @Test
     public void shouldNotCreateBaseDirectory() throws IOException {
-        initializeStateDirectory(false);
-        assertFalse(stateDir.exists());
-        assertFalse(appDir.exists());
+        try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+            initializeStateDirectory(false);
+            assertThat(stateDir.exists(), is(false));
+            assertThat(appDir.exists(), is(false));
+            assertThat(appender.getMessages(),
+                       not(hasItem(containsString("Error changing permissions for the state or base directory"))));
+        }
     }
 
     @Test