You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/25 06:52:01 UTC

[flink] 03/06: [FLINK-9061] [fs] Add entropy injector for file systems

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2d8633f3d805ebd2900b54e300817d7384d84dd
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 20:40:57 2018 +0200

    [FLINK-9061] [fs] Add entropy injector for file systems
---
 .../flink/core/fs/EntropyInjectingFileSystem.java  |  49 +++++++
 .../org/apache/flink/core/fs/EntropyInjector.java  | 135 +++++++++++++++++++
 .../apache/flink/core/fs/OutputStreamAndPath.java  |  47 +++++++
 .../java/org/apache/flink/util/StringUtils.java    |  33 +++++
 .../apache/flink/core/fs/EntropyInjectorTest.java  | 148 +++++++++++++++++++++
 .../org/apache/flink/util/StringUtilsTest.java     |  12 ++
 6 files changed, 424 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjectingFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjectingFileSystem.java
new file mode 100644
index 0000000..14a15be
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjectingFileSystem.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An interface to be implemented by a {@link FileSystem} that is aware of entropy injection.
+ *
+ * <p>Entropy injection is a technique to spread files/objects across more parallel shards of
+ * a distributed storage (typically object store) by adding random characters to the beginning
+ * of the path/key and hence spearing the keys across a wider domain of prefixes.
+ *
+ * <p>Entropy injection typically works by having a recognized marker string in paths
+ * and replacing that marker with random characters.
+ *
+ * <p>This interface is used in conjunction with the {@link EntropyInjector} (as a poor man's
+ * way to build a mix-in in Java).
+ */
+@PublicEvolving
+public interface EntropyInjectingFileSystem {
+
+	/**
+	 * Gets the marker string that represents the substring of a path to be replaced
+	 * by the entropy characters.
+	 */
+	String getEntropyInjectionKey();
+
+	/**
+	 * Creates a string with random entropy to be injected into a path.
+	 */
+	String generateEntropy();
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
new file mode 100644
index 0000000..5bcb618
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * This class offers utilities for entropy injection for FileSystems that implement
+ * {@link EntropyInjectingFileSystem}.
+ */
+@PublicEvolving
+public class EntropyInjector {
+
+	/**
+	 * Handles entropy injection across regular and entropy-aware file systems.
+	 *
+	 * <p>If the given file system is entropy-aware (a implements {@link EntropyInjectingFileSystem}),
+	 * then this method replaces the entropy marker in the path with random characters.
+	 * The entropy marker is defined by {@link EntropyInjectingFileSystem#getEntropyInjectionKey()}.
+	 *
+	 * <p>If the given file system does not implement {@code EntropyInjectingFileSystem},
+	 * then this method delegates to {@link FileSystem#create(Path, WriteMode)} and
+	 * returns the same path in the resulting {@code OutputStreamAndPath}.
+	 */
+	public static OutputStreamAndPath createEntropyAware(
+			FileSystem fs,
+			Path path,
+			WriteMode writeMode) throws IOException {
+
+		if (!(fs instanceof EntropyInjectingFileSystem)) {
+			return new OutputStreamAndPath(fs.create(path, writeMode), path);
+		}
+
+		final EntropyInjectingFileSystem efs = (EntropyInjectingFileSystem) fs;
+		final Path pathWithEntropy = resolveEntropy(path, efs, true);
+
+		final FSDataOutputStream out = fs.create(pathWithEntropy, writeMode);
+		return new OutputStreamAndPath(out, pathWithEntropy);
+	}
+
+	/**
+	 * Removes the entropy marker string from the path, if the given file system is an
+	 * entropy-injecting file system (implements {@link EntropyInjectingFileSystem}) and
+	 * the entropy marker key is present. Otherwise, this returns the path as is.
+	 *
+	 * @param path The path to filter.
+	 * @return The path without the marker string.
+	 */
+	public static Path removeEntropyMarkerIfPresent(FileSystem fs, Path path) {
+		if (fs instanceof EntropyInjectingFileSystem) {
+			try {
+				return resolveEntropy(path, (EntropyInjectingFileSystem) fs, false);
+			}
+			catch (IOException e) {
+				// this should never happen, because the path was valid before. rethrow to silence the compiler
+				throw new FlinkRuntimeException(e.getMessage(), e);
+			}
+		}
+		else {
+			return path;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	static Path resolveEntropy(Path path, EntropyInjectingFileSystem efs, boolean injectEntropy) throws IOException {
+		final String entropyInjectionKey = efs.getEntropyInjectionKey();
+
+		if (entropyInjectionKey == null) {
+			return path;
+		}
+		else {
+			final URI originalUri = path.toUri();
+			final String checkpointPath = originalUri.getPath();
+
+			final int indexOfKey = checkpointPath.indexOf(entropyInjectionKey);
+			if (indexOfKey == -1) {
+				return path;
+			}
+			else {
+				final StringBuilder buffer = new StringBuilder(checkpointPath.length());
+				buffer.append(checkpointPath, 0, indexOfKey);
+
+				if (injectEntropy) {
+					buffer.append(efs.generateEntropy());
+				}
+
+				buffer.append(checkpointPath, indexOfKey + entropyInjectionKey.length(), checkpointPath.length());
+
+				final String rewrittenPath = buffer.toString();
+				try {
+					return new Path(new URI(
+							originalUri.getScheme(),
+							originalUri.getAuthority(),
+							rewrittenPath,
+							originalUri.getQuery(),
+							originalUri.getFragment()).normalize());
+				}
+				catch (URISyntaxException e) {
+					// this could only happen if the injected entropy string contains invalid characters
+					throw new IOException("URI format error while processing path for entropy injection", e);
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** This class is not meant to be instantiated. */
+	private EntropyInjector() {}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamAndPath.java b/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamAndPath.java
new file mode 100644
index 0000000..62c9479
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/OutputStreamAndPath.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An output stream and a path.
+ */
+public final class OutputStreamAndPath {
+
+	private final FSDataOutputStream stream;
+
+	private final Path path;
+
+	/**
+	 * Creates a OutputStreamAndPath.
+	 */
+	public OutputStreamAndPath(FSDataOutputStream stream, Path path) {
+		this.stream = checkNotNull(stream);
+		this.path = checkNotNull(path);
+	}
+
+	public FSDataOutputStream stream() {
+		return stream;
+	}
+
+	public Path path() {
+		return path;
+	}
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 208a301..a5fff4c 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -247,6 +248,38 @@ public final class StringUtils {
 	}
 
 	/**
+	 * Creates a random alphanumeric string of given length.
+	 *
+	 * @param rnd The random number generator to use.
+	 * @param length The number of alphanumeric characters to append.
+	 */
+	public static String generateRandomAlphanumericString(Random rnd, int length) {
+		checkNotNull(rnd);
+		checkArgument(length >= 0);
+
+		StringBuilder buffer = new StringBuilder(length);
+		for (int i = 0; i < length; i++) {
+			buffer.append(nextAlphanumericChar(rnd));
+		}
+		return buffer.toString();
+	}
+
+	private static char nextAlphanumericChar(Random rnd) {
+		int which = rnd.nextInt(62);
+		char c;
+		if (which < 10) {
+			c = (char) ('0' + which);
+		}
+		else if (which < 36) {
+			c = (char) ('A' - 10 + which);
+		}
+		else {
+			c = (char) ('a' - 36 + which);
+		}
+		return c;
+	}
+
+	/**
 	 * Writes a String to the given output.
 	 * The written string can be read with {@link #readString(DataInputView)}.
 	 *
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
new file mode 100644
index 0000000..a425826
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link EntropyInjector}.
+ */
+public class EntropyInjectorTest {
+
+	@ClassRule
+	public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void testEmptyPath() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("test", "ignored");
+		Path path = new Path("hdfs://localhost:12345");
+
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testFullUriNonMatching() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "ignored");
+		Path path = new Path("s3://hugo@myawesomehost:55522/path/to/the/file");
+
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testFullUriMatching() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("s0mek3y", "12345678");
+		Path path = new Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+
+		assertEquals(new Path("s3://hugo@myawesomehost:55522/path/12345678/the/file"), EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(new Path("s3://hugo@myawesomehost:55522/path/the/file"), EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testPathOnlyNonMatching() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "ignored");
+		Path path = new Path("/path/file");
+
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(path, EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testPathOnlyMatching() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "xyzz");
+		Path path = new Path("/path/_entropy_key_/file");
+
+		assertEquals(new Path("/path/xyzz/file"), EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(new Path("/path/file"), EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testEntropyNotFullSegment() throws Exception {
+		EntropyInjectingFileSystem efs = new TestEntropyInjectingFs("_entropy_key_", "pqr");
+		Path path = new Path("s3://myhost:122/entropy-_entropy_key_-suffix/file");
+
+		assertEquals(new Path("s3://myhost:122/entropy-pqr-suffix/file"), EntropyInjector.resolveEntropy(path, efs, true));
+		assertEquals(new Path("s3://myhost:122/entropy--suffix/file"), EntropyInjector.resolveEntropy(path, efs, false));
+	}
+
+	@Test
+	public void testCreateEntropyAwarePlainFs() throws Exception {
+		File folder = TMP_FOLDER.newFolder();
+		Path path = new Path(Path.fromLocalFile(folder), "_entropy_/file");
+
+		OutputStreamAndPath out = EntropyInjector.createEntropyAware(
+				LocalFileSystem.getSharedInstance(), path, WriteMode.NO_OVERWRITE);
+
+		out.stream().close();
+
+		assertEquals(path, out.path());
+		assertTrue(new File (new File(folder, "_entropy_"), "file").exists());
+	}
+
+	@Test
+	public void testCreateEntropyAwareEntropyFs() throws Exception {
+		File folder = TMP_FOLDER.newFolder();
+		Path path = new Path(Path.fromLocalFile(folder), "_entropy_/file");
+		Path pathWithEntropy = new Path(Path.fromLocalFile(folder), "test-entropy/file");
+
+		FileSystem fs = new TestEntropyInjectingFs("_entropy_", "test-entropy");
+
+		OutputStreamAndPath out = EntropyInjector.createEntropyAware(fs, path, WriteMode.NO_OVERWRITE);
+
+		out.stream().close();
+
+		assertEquals(new Path(Path.fromLocalFile(folder), "test-entropy/file"), out.path());
+		assertTrue(new File (new File(folder, "test-entropy"), "file").exists());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestEntropyInjectingFs extends LocalFileSystem implements EntropyInjectingFileSystem {
+
+		private final String key;
+
+		private final String entropy;
+
+		TestEntropyInjectingFs(String key, String entropy) {
+			this.key = key;
+			this.entropy = entropy;
+		}
+
+		@Override
+		public String getEntropyInjectionKey() {
+			return key;
+		}
+
+		@Override
+		public String generateEntropy() {
+			return entropy;
+		}
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
index 5f705b4..7aa8e6e 100644
--- a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.util;
 
 import org.junit.Test;
 
+import java.util.Random;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link StringUtils}.
@@ -56,4 +59,13 @@ public class StringUtilsTest extends TestLogger {
 		String hex = StringUtils.byteToHexString(byteArray);
 		assertEquals("019f314a", hex);
 	}
+
+	@Test
+	public void testGenerateAlphanumeric() {
+		String str = StringUtils.generateRandomAlphanumericString(new Random(), 256);
+
+		if (!str.matches("[a-zA-Z0-9]{256}")) {
+			fail("Not alphanumeric: " + str);
+		}
+	}
 }