You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2020/11/13 19:04:46 UTC

[kafka] branch 2.6 updated: KAFKA-10705: Make state stores not readable by others (#9583)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 8441f0e  KAFKA-10705: Make state stores not readable by others (#9583)
8441f0e is described below

commit 8441f0e5182de60df6e4b89612f2397e4770a0e3
Author: leah <lt...@confluent.io>
AuthorDate: Fri Nov 13 12:44:09 2020 -0600

    KAFKA-10705: Make state stores not readable by others (#9583)
    
    Change permissions on the folders for the state store so they're no readable or writable by "others", but still accessible by owner and group members.
    
    Reviewers: Bruno Cadonna <br...@confluent.io>,  Walker Carlson <wc...@confluent.io>, Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../processor/internals/StateDirectory.java        | 19 +++++++++++++++
 .../processor/internals/StateDirectoryTest.java    | 27 ++++++++++++++++++++++
 2 files changed, 46 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 35f937a..11b582f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -31,10 +31,15 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
 import java.nio.file.Path;
+import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
@@ -98,6 +103,20 @@ public class StateDirectory {
             throw new ProcessorStateException(
                 String.format("state directory [%s] doesn't exist and couldn't be created", stateDir.getPath()));
         }
+        
+        if (hasPersistentStores && stateDirName.startsWith("/tmp")) {
+            log.warn("Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file" +
+                " due to the fact that this directory can be cleared by the OS");
+        }
+        final Path basePath = Paths.get(baseDir.getPath());
+        final Path statePath = Paths.get(stateDir.getPath());
+        final Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxr-x---");
+        try {
+            Files.setPosixFilePermissions(basePath, perms);
+            Files.setPosixFilePermissions(statePath, perms);
+        } catch (final IOException e) {
+            log.error("Error changing permissions for the state or base directory {} ", stateDir.getPath(), e);
+        }
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 5c7e678..93e81c0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -34,10 +34,14 @@ import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.OverlappingFileLockException;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.PosixFilePermission;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
@@ -62,6 +66,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 public class StateDirectoryTest {
 
@@ -107,6 +113,27 @@ public class StateDirectoryTest {
     }
 
     @Test
+    public void shouldHaveSecurePermissions() {
+        final Set<PosixFilePermission> expectedPermissions = EnumSet.of(
+            PosixFilePermission.OWNER_EXECUTE,
+            PosixFilePermission.GROUP_READ,
+            PosixFilePermission.OWNER_WRITE,
+            PosixFilePermission.GROUP_EXECUTE,
+            PosixFilePermission.OWNER_READ);
+
+        final Path statePath = Paths.get(stateDir.getPath());
+        final Path basePath = Paths.get(appDir.getPath());
+        try {
+            final Set<PosixFilePermission> baseFilePermissions = Files.getPosixFilePermissions(statePath);
+            final Set<PosixFilePermission> appFilePermissions = Files.getPosixFilePermissions(basePath);
+            assertThat(expectedPermissions.equals(baseFilePermissions), is(true));
+            assertThat(expectedPermissions.equals(appFilePermissions), is(true));
+        } catch (final IOException e) {
+            fail("Should create correct files and set correct permissions");
+        }
+    }
+
+    @Test
     public void shouldCreateTaskStateDirectory() {
         final TaskId taskId = new TaskId(0, 0);
         final File taskDirectory = directory.directoryForTask(taskId);