You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2020/08/22 15:11:57 UTC
[kafka] branch trunk updated: KAFKA-10211: Add
DirectoryConfigProvider (#9136)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8b94e62 KAFKA-10211: Add DirectoryConfigProvider (#9136)
8b94e62 is described below
commit 8b94e6229548cc4fc015b2dc1e756ce86b5e84bb
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Sat Aug 22 16:10:48 2020 +0100
KAFKA-10211: Add DirectoryConfigProvider (#9136)
See KIP-632: https://cwiki.apache.org/confluence/display/KAFKA/KIP-632%3A+Add+DirectoryConfigProvider
Reviewers: Mickael Maison <mi...@gmail.com>, David Jacot <da...@gmail.com>
---
.../config/provider/DirectoryConfigProvider.java | 106 +++++++++++++++
.../provider/DirectoryConfigProviderTest.java | 149 +++++++++++++++++++++
2 files changed, 255 insertions(+)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
new file mode 100644
index 0000000..adf2774
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyMap;
+
+/**
+ * An implementation of {@link ConfigProvider} based on a directory of files.
+ * Property keys correspond to the names of the regular (i.e. non-directory)
+ * files in a directory given by the path parameter.
+ * Property values are taken from the file contents corresponding to each key.
+ */
+public class DirectoryConfigProvider implements ConfigProvider {
+
+ private static final Logger log = LoggerFactory.getLogger(DirectoryConfigProvider.class);
+
+ @Override
+ public void configure(Map<String, ?> configs) { }
+
+ @Override
+ public void close() throws IOException { }
+
+ /**
+ * Retrieves the data contained in regular files in the directory given by {@code path}.
+ * Non-regular files (such as directories) in the given directory are silently ignored.
+ * @param path the directory where data files reside.
+ * @return the configuration data.
+ */
+ @Override
+ public ConfigData get(String path) {
+ return get(path, Files::isRegularFile);
+ }
+
+ /**
+ * Retrieves the data contained in the regular files named by {@code keys} in the directory given by {@code path}.
+ * Non-regular files (such as directories) in the given directory are silently ignored.
+ * @param path the directory where data files reside.
+ * @param keys the keys whose values will be retrieved.
+ * @return the configuration data.
+ */
+ @Override
+ public ConfigData get(String path, Set<String> keys) {
+ return get(path, pathname ->
+ Files.isRegularFile(pathname)
+ && keys.contains(pathname.getFileName().toString()));
+ }
+
+ private static ConfigData get(String path, Predicate<Path> fileFilter) {
+ Map<String, String> map = emptyMap();
+ if (path != null && !path.isEmpty()) {
+ Path dir = new File(path).toPath();
+ if (!Files.isDirectory(dir)) {
+ log.warn("The path {} is not a directory", path);
+ } else {
+ try {
+ map = Files.list(dir)
+ .filter(fileFilter)
+ .collect(Collectors.toMap(
+ p -> p.getFileName().toString(),
+ p -> read(p)));
+ } catch (IOException e) {
+ throw new ConfigException("Could not list directory " + dir, e);
+ }
+ }
+ }
+ return new ConfigData(map);
+ }
+
+ private static String read(Path path) {
+ try {
+ return new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new ConfigException("Could not read file " + path + " for property " + path.getFileName(), e);
+ }
+ }
+
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java b/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
new file mode 100644
index 0000000..9d7139b
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/DirectoryConfigProviderTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.test.TestUtils.toSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class DirectoryConfigProviderTest {
+
+ private DirectoryConfigProvider provider;
+ private File parent;
+ private File dir;
+ private File bar;
+ private File foo;
+ private File subdir;
+ private File subdirFile;
+ private File siblingDir;
+ private File siblingDirFile;
+ private File siblingFile;
+
+ private static File writeFile(File file) throws IOException {
+ Files.write(file.toPath(), file.getName().toUpperCase(Locale.ENGLISH).getBytes(StandardCharsets.UTF_8));
+ return file;
+ }
+
+ @Before
+ public void setup() throws IOException {
+ provider = new DirectoryConfigProvider();
+ provider.configure(Collections.emptyMap());
+ parent = TestUtils.tempDirectory();
+ dir = new File(parent, "dir");
+ dir.mkdir();
+ foo = writeFile(new File(dir, "foo"));
+ bar = writeFile(new File(dir, "bar"));
+ subdir = new File(dir, "subdir");
+ subdir.mkdir();
+ subdirFile = writeFile(new File(subdir, "subdirFile"));
+ siblingDir = new File(parent, "siblingdir");
+ siblingDir.mkdir();
+ siblingDirFile = writeFile(new File(siblingDir, "siblingdirFile"));
+ siblingFile = writeFile(new File(parent, "siblingFile"));
+ }
+
+ @After
+ public void close() throws IOException {
+ provider.close();
+ Utils.delete(parent);
+ }
+
+ @Test
+ public void testGetAllKeysAtPath() throws IOException {
+ ConfigData configData = provider.get(dir.getAbsolutePath());
+ assertEquals(toSet(asList(foo.getName(), bar.getName())), configData.data().keySet());
+ assertEquals("FOO", configData.data().get(foo.getName()));
+ assertEquals("BAR", configData.data().get(bar.getName()));
+ assertNull(configData.ttl());
+ }
+
+ @Test
+ public void testGetSetOfKeysAtPath() {
+ Set<String> keys = toSet(asList(foo.getName(), "baz"));
+ ConfigData configData = provider.get(dir.getAbsolutePath(), keys);
+ assertEquals(Collections.singleton(foo.getName()), configData.data().keySet());
+ assertEquals("FOO", configData.data().get(foo.getName()));
+ assertNull(configData.ttl());
+ }
+
+ @Test
+ public void testNoSubdirs() {
+ // Only regular files directly in the path directory are allowed, not in subdirs
+ Set<String> keys = toSet(asList(subdir.getName(), String.join(File.separator, subdir.getName(), subdirFile.getName())));
+ ConfigData configData = provider.get(dir.getAbsolutePath(), keys);
+ assertTrue(configData.data().isEmpty());
+ assertNull(configData.ttl());
+ }
+
+ @Test
+ public void testNoTraversal() {
+ // Check we can't escape outside the path directory
+ Set<String> keys = toSet(asList(
+ String.join(File.separator, "..", siblingFile.getName()),
+ String.join(File.separator, "..", siblingDir.getName()),
+ String.join(File.separator, "..", siblingDir.getName(), siblingDirFile.getName())));
+ ConfigData configData = provider.get(dir.getAbsolutePath(), keys);
+ assertTrue(configData.data().isEmpty());
+ assertNull(configData.ttl());
+ }
+
+ @Test
+ public void testEmptyPath() {
+ ConfigData configData = provider.get("");
+ assertTrue(configData.data().isEmpty());
+ assertNull(configData.ttl());
+ }
+
+ @Test
+ public void testEmptyPathWithKey() {
+ ConfigData configData = provider.get("", Collections.singleton("foo"));
+ assertTrue(configData.data().isEmpty());
+ assertEquals(null, configData.ttl());
+ }
+
+ @Test
+ public void testNullPath() {
+ ConfigData configData = provider.get(null);
+ assertTrue(configData.data().isEmpty());
+ assertEquals(null, configData.ttl());
+ }
+
+ @Test
+ public void testNullPathWithKey() {
+ ConfigData configData = provider.get(null, Collections.singleton("foo"));
+ assertTrue(configData.data().isEmpty());
+ assertEquals(null, configData.ttl());
+ }
+}
+