You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/24 21:56:50 UTC
[flink] 02/05: [FLINK-9061] [fs] Add entropy injector for file
systems
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit df02ed4d905c46c36f37b8c95e0413d461150458
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 20:40:57 2018 +0200
[FLINK-9061] [fs] Add entropy injector for file systems
---
.../flink/core/fs/EntropyInjectingFileSystem.java | 49 +++++++
.../org/apache/flink/core/fs/EntropyInjector.java | 135 +++++++++++++++++++
.../apache/flink/core/fs/OutputStreamAndPath.java | 47 +++++++
.../java/org/apache/flink/util/StringUtils.java | 33 +++++
.../apache/flink/core/fs/EntropyInjectorTest.java | 148 +++++++++++++++++++++
.../org/apache/flink/util/StringUtilsTest.java | 12 ++
6 files changed, 424 insertions(+)
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjectingFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjectingFileSystem.java
new file mode 100644
index 0000000..14a15be
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjectingFileSystem.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An interface to be implemented by a {@link FileSystem} that is aware of entropy injection.
+ *
+ * <p>Entropy injection is a technique to spread files/objects across more parallel shards of
+ * a distributed storage (typically object store) by adding random characters to the beginning
+ * of the path/key and hence spearing the keys across a wider domain of prefixes.
+ *
+ * <p>Entropy injection typically works by having a recognized marker string in paths
+ * and replacing that marker with random characters.
+ *
+ * <p>This interface is used in conjunction with the {@link EntropyInjector} (as a poor man's
+ * way to build a mix-in in Java).
+ */
+@PublicEvolving
+public interface EntropyInjectingFileSystem {
+
+ /**
+ * Gets the marker string that represents the substring of a path to be replaced
+ * by the entropy characters.
+ */
+ String getEntropyInjectionKey();
+
+ /**
+ * Creates a string with random entropy to be injected into a path.
+ */
+ String generateEntropy();
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
new file mode 100644
index 0000000..5bcb618
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * This class offers utilities for entropy injection for FileSystems that implement
+ * {@link EntropyInjectingFileSystem}.
+ */
+@PublicEvolving
+public class EntropyInjector {
+
+ /**
+ * Handles entropy injection across regular and entropy-aware file systems.
+ *
+ * <p>If the given file system is entropy-aware (a implements {@link EntropyInjectingFileSystem}),
+ * then this method replaces the entropy marker in the path with random characters.
+ * The entropy marker is defined by {@link EntropyInjectingFileSystem#getEntropyInjectionKey()}.
+ *
+ * <p>If the given file system does not implement {@code EntropyInjectingFileSystem},
+ * then this method delegates to {@link FileSystem#create(Path, WriteMode)} and
+ * returns the same path in the resulting {@code OutputStreamAndPath}.
+ */
+ public static OutputStreamAndPath createEntropyAware(
+ FileSystem fs,
+ Path path,
+ WriteMode writeMode) throws IOException {
+
+ if (!(fs instanceof EntropyInjectingFileSystem)) {
+ return new OutputStreamAndPath(fs.create(path, writeMode), path);
+ }
+
+ final EntropyInjectingFileSystem efs = (EntropyInjectingFileSystem) fs;
+ final Path pathWithEntropy = resolveEntropy(path, efs, true);
+
+ final FSDataOutputStream out = fs.create(pathWithEntropy, writeMode);
+ return new OutputStreamAndPath(out, pathWithEntropy);
+ }
+
+ /**
+ * Removes the entropy marker string from the path, if the given file system is an
+ * entropy-injecting file system (implements {@link EntropyInjectingFileSystem}) and
+ * the entropy marker key is present. Otherwise, this returns the path as is.
+ *
+ * @param path The path to filter.
+ * @return The path without the marker string.
+ */
+ public static Path removeEntropyMarkerIfPresent(FileSystem fs, Path path) {
+ if (fs instanceof EntropyInjectingFileSystem) {
+ try {
+ return resolveEntropy(path, (EntropyInjectingFileSystem) fs, false);
+ }
+ catch (IOException e) {
+ // this should never happen, because the path was valid before. rethrow to silence the compiler
+ throw new FlinkRuntimeException(e.getMessage(), e);
+ }
+ }
+ else {
+ return path;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ @VisibleForTesting
+ static Path resolveEntropy(Path path, EntropyInjectingFileSystem efs, boolean injectEntropy) throws IOException {
+ final String entropyInjectionKey = efs.getEntropyInjectionKey();
+
+ if (entropyInjectionKey == null) {
+ return path;
+ }
+ else {
+ final URI originalUri = path.toUri();
+ final String checkpointPath = originalUri.getPath();
+
+ final int indexOfKey = checkpointPath.indexOf(entropyInjectionKey);
+ if (indexOfKey == -1) {
+ return path;
+ }
+ else {
+ final StringBuilder buffer = new StringBuilder(checkpointPath.length());
+ buffer.append(checkpointPath, 0, indexOfKey);
+
+ if (injectEntropy) {
+ buffer.append(efs.generateEntropy());
+ }
+
+ buffer.append(checkpointPath, indexOfKey + entropyInjectionKey.length(), checkpointPath.length());
+
+ final String rewrittenPath = buffer.toString();
+ try {
+ return new Path(new URI(
+ originalUri.getScheme(),
+ originalUri.getAuthority(),
+ rewrittenPath,
+ originalUri.getQuery(),
+ originalUri.getFragment()).normalize());
+ }
+ catch (URISyntaxException e) {
+ // this could only happen if the injected entropy string contains invalid characters
+ throw new IOException("URI format error while processing path for entropy injection", e);
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /** This class is not meant to be instantiated. */
+ private EntropyInjector() {}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamAndPath.java b/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamAndPath.java
new file mode 100644
index 0000000..62c9479
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamAndPath.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An output stream and a path.
+ */
+public final class OutputStreamAndPath {
+
+ private final FSDataOutputStream stream;
+
+ private final Path path;
+
+ /**
+ * Creates a OutputStreamAndPath.
+ */
+ public OutputStreamAndPath(FSDataOutputStream stream, Path path) {
+ this.stream = checkNotNull(stream);
+ this.path = checkNotNull(path);
+ }
+
+ public FSDataOutputStream stream() {
+ return stream;
+ }
+
+ public Path path() {
+ return path;
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index b9ff319..a968a8a 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -32,6 +32,7 @@ import java.util.Objects;
import java.util.Random;
import java.util.stream.Collectors;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -249,6 +250,38 @@ public final class StringUtils {
}
/**
+ * Creates a random alphanumeric string of given length.
+ *
+ * @param rnd The random number generator to use.
+ * @param length The number of alphanumeric characters to append.
+ */
+ public static String generateRandomAlphanumericString(Random rnd, int length) {
+ checkNotNull(rnd);
+ checkArgument(length >= 0);
+
+ StringBuilder buffer = new StringBuilder(length);
+ for (int i = 0; i < length; i++) {
+ buffer.append(nextAlphanumericChar(rnd));
+ }
+ return buffer.toString();
+ }
+
+ private static char nextAlphanumericChar(Random rnd) {
+ int which = rnd.nextInt(62);
+ char c;
+ if (which < 10) {
+ c = (char) ('0' + which);
+ }
+ else if (which < 36) {
+ c = (char) ('A' - 10 + which);
+ }
+ else {
+ c = (char) ('a' - 36 + which);
+ }
+ return c;
+ }
+
+ /**
* Writes a String to the given output.
* The written string can be read with {@link #readString(DataInputView)}.
*
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
new file mode 100644
index 0000000..a425826
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link EntropyInjector}.
+ */
+public class EntropyInjectorTest {
+
+ @ClassRule
+ public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testEmptyPath() throws Exception {
+ EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("test", "ignored");
+ Path path = new Path("hdfs://localhost:12345");
+
+ assertEquals(path, EntropyInjector.resolveEntropy(path, efs, true));
+ assertEquals(path, EntropyInjector.resolveEntropy(path, efs, false));
+ }
+
+ @Test
+ public void testFullUriNonMatching() throws Exception {
+ EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "ignored");
+ Path path = new Path("s3://hugo@myawesomehost:55522/path/to/the/file");
+
+ assertEquals(path, EntropyInjector.resolveEntropy(path, efs, true));
+ assertEquals(path, EntropyInjector.resolveEntropy(path, efs, false));
+ }
+
+ @Test
+ public void testFullUriMatching() throws Exception {
+ EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("s0mek3y", "12345678");
+ Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+
+ assertEquals(new Path("s3://hugo@myawesomehost:55522/path/12345678/the/file"), EntropyInjector.resolveEntropy(path, efs, true));
+ assertEquals(new Path("s3://hugo@myawesomehost:55522/path/the/file"), EntropyInjector.resolveEntropy(path, efs, false));
+ }
+
+ @Test
+ public void testPathOnlyNonMatching() throws Exception {
+ EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "ignored");
+ Path path = new Path("/path/file");
+
+ assertEquals(path, EntropyInjector.resolveEntropy(path, efs, true));
+ assertEquals(path, EntropyInjector.resolveEntropy(path, efs, false));
+ }
+
+ @Test
+ public void testPathOnlyMatching() throws Exception {
+ EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "xyzz");
+ Path path = new Path("/path/_entropy_key_/file");
+
+ assertEquals(new Path("/path/xyzz/file"), EntropyInjector.resolveEntropy(path, efs, true));
+ assertEquals(new Path("/path/file"), EntropyInjector.resolveEntropy(path, efs, false));
+ }
+
+ @Test
+ public void testEntropyNotFullSegment() throws Exception {
+ EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "pqr");
+ Path path = new Path("s3://myhost:122/entropy-_entropy_key_-suffix/file");
+
+ assertEquals(new Path("s3://myhost:122/entropy-pqr-suffix/file"), EntropyInjector.resolveEntropy(path, efs, true));
+ assertEquals(new Path("s3://myhost:122/entropy--suffix/file"), EntropyInjector.resolveEntropy(path, efs, false));
+ }
+
+ @Test
+ public void testCreateEntropyAwarePlainFs() throws Exception {
+ File folder = TMP_FOLDER.newFolder();
+ Path path = new Path(Path.fromLocalFile(folder), "_entropy_/file");
+
+ OutputStreamAndPath out = EntropyInjector.createEntropyAware(
+ LocalFileSystem.getSharedInstance(), path, WriteMode.NO_OVERWRITE);
+
+ out.stream().close();
+
+ assertEquals(path, out.path());
+ assertTrue(new File (new File(folder, "_entropy_"), "file").exists());
+ }
+
+ @Test
+ public void testCreateEntropyAwareEntropyFs() throws Exception {
+ File folder = TMP_FOLDER.newFolder();
+ Path path = new Path(Path.fromLocalFile(folder), "_entropy_/file");
+ Path pathWithEntropy = new Path(Path.fromLocalFile(folder), "test-entropy/file");
+
+ FileSystem fs = new TestEntropyInjectingFs("_entropy_", "test-entropy");
+
+ OutputStreamAndPath out = EntropyInjector.createEntropyAware(fs, path, WriteMode.NO_OVERWRITE);
+
+ out.stream().close();
+
+ assertEquals(new Path(Path.fromLocalFile(folder), "test-entropy/file"), out.path());
+ assertTrue(new File (new File(folder, "test-entropy"), "file").exists());
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class TestEntropyInjectingFs extends LocalFileSystem implements EntropyInjectingFileSystem {
+
+ private final String key;
+
+ private final String entropy;
+
+ TestEntropyInjectingFs(String key, String entropy) {
+ this.key = key;
+ this.entropy = entropy;
+ }
+
+ @Override
+ public String getEntropyInjectionKey() {
+ return key;
+ }
+
+ @Override
+ public String generateEntropy() {
+ return entropy;
+ }
+ }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
index 5f705b4..7aa8e6e 100644
--- a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.util;
import org.junit.Test;
+import java.util.Random;
+
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
/**
* Tests for the {@link StringUtils}.
@@ -56,4 +59,13 @@ public class StringUtilsTest extends TestLogger {
String hex = StringUtils.byteToHexString(byteArray);
assertEquals("019f314a", hex);
}
+
+ @Test
+ public void testGenerateAlphanumeric() {
+ String str = StringUtils.generateRandomAlphanumericString(new Random(), 256);
+
+ if (!str.matches("[a-zA-Z0-9]{256}")) {
+ fail("Not alphanumeric: " + str);
+ }
+ }
}