You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/02/23 23:52:17 UTC
[kafka] branch 2.6 updated: KAFKA-12211: don't change perm for
base/state dir when no persistent store
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 7944758 KAFKA-12211: don't change perm for base/state dir when no persistent store
7944758 is described below
commit 794475884d117aedefed1485567aa7821f7a3bfc
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Feb 23 15:50:47 2021 -0800
KAFKA-12211: don't change perm for base/state dir when no persistent store
If a user doesn't have Persistent Stores, we won't create base dir and state dir and should not try to set permissions on them.
Reviewers: Walker Carlson <wc...@confluent.io>
---
.../processor/internals/StateDirectory.java | 32 ++++++++++++----------
.../processor/internals/StateDirectoryTest.java | 10 +++++--
2 files changed, 25 insertions(+), 17 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index fd6cf96..eca86c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -119,22 +119,26 @@ public class StateDirectory {
this.appId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final String stateDirName = config.getString(StreamsConfig.STATE_DIR_CONFIG);
final File baseDir = new File(stateDirName);
- if (this.hasPersistentStores && !baseDir.exists() && !baseDir.mkdirs()) {
- throw new ProcessorStateException(
- String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
- }
stateDir = new File(baseDir, appId);
- if (this.hasPersistentStores && !stateDir.exists() && !stateDir.mkdir()) {
- throw new ProcessorStateException(
- String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
- }
-
- if (hasPersistentStores && stateDirName.startsWith("/tmp")) {
- log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file" +
- " due to the fact that this directory can be cleared by the OS");
+
+ if (this.hasPersistentStores) {
+ if (!baseDir.exists() && !baseDir.mkdirs()) {
+ throw new ProcessorStateException(
+ String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName));
+ }
+
+ if (!stateDir.exists() && !stateDir.mkdir()) {
+ throw new ProcessorStateException(
+ String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
+ }
+
+ if (stateDirName.startsWith("/tmp")) {
+ log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file" +
+ " due to the fact that this directory can be cleared by the OS");
+ }
+ configurePermissions(baseDir);
+ configurePermissions(stateDir);
}
- configurePermissions(baseDir);
- configurePermissions(stateDir);
}
private void configurePermissions(final File file) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 200a2f7..2439981 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -560,9 +560,13 @@ public class StateDirectoryTest {
@Test
public void shouldNotCreateBaseDirectory() throws IOException {
- initializeStateDirectory(false);
- assertFalse(stateDir.exists());
- assertFalse(appDir.exists());
+ try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StateDirectory.class)) {
+ initializeStateDirectory(false);
+ assertThat(stateDir.exists(), is(false));
+ assertThat(appDir.exists(), is(false));
+ assertThat(appender.getMessages(),
+ not(hasItem(containsString("Error changing permissions for the state or base directory"))));
+ }
}
@Test