You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/08/22 15:11:57 UTC

[kafka] branch trunk updated: KAFKA-10211: Add DirectoryConfigProvider (#9136)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8b94e62  KAFKA-10211: Add DirectoryConfigProvider (#9136)
8b94e62 is described below

commit 8b94e6229548cc4fc015b2dc1e756ce86b5e84bb
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Sat Aug 22 16:10:48 2020 +0100

    KAFKA-10211: Add DirectoryConfigProvider (#9136)
    
    See KIP-632: https://cwiki.apache.org/confluence/display/KAFKA/KIP-632%3A+Add+DirectoryConfigProvider
    
    Reviewers: Mickael Maison <mi...@gmail.com>, David Jacot <da...@gmail.com>
---
 .../config/provider/DirectoryConfigProvider.java   | 106 +++++++++++++++
 .../provider/DirectoryConfigProviderTest.java      | 149 +++++++++++++++++++++
 2 files changed, 255 insertions(+)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
new file mode 100644
index 0000000..adf2774
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyMap;
+
+/**
+ * An implementation of {@link ConfigProvider} based on a directory of files.
+ * Property keys correspond to the names of the regular (i.e. non-directory)
+ * files in a directory given by the path parameter.
+ * Property values are taken from the file contents corresponding to each key.
+ */
+public class DirectoryConfigProvider implements ConfigProvider {
+
+    private static final Logger log = LoggerFactory.getLogger(DirectoryConfigProvider.class);
+
+    @Override
+    public void configure(Map<String, ?> configs) { }
+
+    @Override
+    public void close() throws IOException { }
+
+    /**
+     * Retrieves the data contained in regular files in the directory given by {@code path}.
+     * Non-regular files (such as directories) in the given directory are silently ignored.
+     * @param path the directory where data files reside.
+     * @return the configuration data.
+     */
+    @Override
+    public ConfigData get(String path) {
+        return get(path, Files::isRegularFile);
+    }
+
+    /**
+     * Retrieves the data contained in the regular files named by {@code keys} in the directory given by {@code path}.
+     * Non-regular files (such as directories) in the given directory are silently ignored.
+     * @param path the directory where data files reside.
+     * @param keys the keys whose values will be retrieved.
+     * @return the configuration data.
+     */
+    @Override
+    public ConfigData get(String path, Set<String> keys) {
+        return get(path, pathname ->
+                Files.isRegularFile(pathname)
+                        && keys.contains(pathname.getFileName().toString()));
+    }
+
+    private static ConfigData get(String path, Predicate<Path> fileFilter) {
+        Map<String, String> map = emptyMap();
+        if (path != null && !path.isEmpty()) {
+            Path dir = new File(path).toPath();
+            if (!Files.isDirectory(dir)) {
+                log.warn("The path {} is not a directory", path);
+            } else {
+                try {
+                    map = Files.list(dir)
+                        .filter(fileFilter)
+                        .collect(Collectors.toMap(
+                            p -> p.getFileName().toString(),
+                            p -> read(p)));
+                } catch (IOException e) {
+                    throw new ConfigException("Could not list directory " + dir, e);
+                }
+            }
+        }
+        return new ConfigData(map);
+    }
+
+    private static String read(Path path) {
+        try {
+            return new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            throw new ConfigException("Could not read file " + path + " for property " + path.getFileName(), e);
+        }
+    }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java b/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
new file mode 100644
index 0000000..9d7139b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.test.TestUtils.toSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class DirectoryConfigProviderTest {
+
+    private DirectoryConfigProvider provider;
+    private File parent;
+    private File dir;
+    private File bar;
+    private File foo;
+    private File subdir;
+    private File subdirFile;
+    private File siblingDir;
+    private File siblingDirFile;
+    private File siblingFile;
+
+    private static File writeFile(File file) throws IOException {
+        Files.write(file.toPath(), file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
+        return file;
+    }
+
+    @Before
+    public void setup() throws IOException {
+        provider = new DirectoryConfigProvider();
+        provider.configure(Collections.emptyMap());
+        parent = TestUtils.tempDirectory();
+        dir = new File(parent, "dir");
+        dir.mkdir();
+        foo = writeFile(new File(dir, "foo"));
+        bar = writeFile(new File(dir, "bar"));
+        subdir = new File(dir, "subdir");
+        subdir.mkdir();
+        subdirFile = writeFile(new File(subdir, "subdirFile"));
+        siblingDir = new File(parent, "siblingdir");
+        siblingDir.mkdir();
+        siblingDirFile = writeFile(new File(siblingDir, "siblingdirFile"));
+        siblingFile = writeFile(new File(parent, "siblingFile"));
+    }
+
+    @After
+    public void close() throws IOException {
+        provider.close();
+        Utils.delete(parent);
+    }
+
+    @Test
+    public void testGetAllKeysAtPath() throws IOException {
+        ConfigData configData = provider.get(dir.getAbsolutePath());
+        assertEquals(toSet(asList(foo.getName(), bar.getName())), configData.data().keySet());
+        assertEquals("FOO", configData.data().get(foo.getName()));
+        assertEquals("BAR", configData.data().get(bar.getName()));
+        assertNull(configData.ttl());
+    }
+
+    @Test
+    public void testGetSetOfKeysAtPath() {
+        Set<String> keys = toSet(asList(foo.getName(), "baz"));
+        ConfigData configData = provider.get(dir.getAbsolutePath(), keys);
+        assertEquals(Collections.singleton(foo.getName()), configData.data().keySet());
+        assertEquals("FOO", configData.data().get(foo.getName()));
+        assertNull(configData.ttl());
+    }
+
+    @Test
+    public void testNoSubdirs() {
+        // Only regular files directly in the path directory are allowed, not in subdirs
+        Set<String> keys = toSet(asList(subdir.getName(), String.join(File.separator, subdir.getName(), subdirFile.getName())));
+        ConfigData configData = provider.get(dir.getAbsolutePath(), keys);
+        assertTrue(configData.data().isEmpty());
+        assertNull(configData.ttl());
+    }
+
+    @Test
+    public void testNoTraversal() {
+        // Check we can't escape outside the path directory
+        Set<String> keys = toSet(asList(
+                String.join(File.separator, "..", siblingFile.getName()),
+                String.join(File.separator, "..", siblingDir.getName()),
+                String.join(File.separator, "..", siblingDir.getName(), siblingDirFile.getName())));
+        ConfigData configData = provider.get(dir.getAbsolutePath(), keys);
+        assertTrue(configData.data().isEmpty());
+        assertNull(configData.ttl());
+    }
+
+    @Test
+    public void testEmptyPath() {
+        ConfigData configData = provider.get("");
+        assertTrue(configData.data().isEmpty());
+        assertNull(configData.ttl());
+    }
+
+    @Test
+    public void testEmptyPathWithKey() {
+        ConfigData configData = provider.get("", Collections.singleton("foo"));
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testNullPath() {
+        ConfigData configData = provider.get(null);
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+
+    @Test
+    public void testNullPathWithKey() {
+        ConfigData configData = provider.get(null, Collections.singleton("foo"));
+        assertTrue(configData.data().isEmpty());
+        assertEquals(null, configData.ttl());
+    }
+}
+