You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/26 20:03:23 UTC

[flink] branch master updated: [FLINK-10444] [core] Make entropy injection work across FileSystem SafetyNet.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cc9b376  [FLINK-10444] [core] Make entropy injection work across FileSystem SafetyNet.
cc9b376 is described below

commit cc9b3769084634dc660f7a90aed0090cb46b3342
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Sep 26 18:26:18 2018 +0200

    [FLINK-10444] [core] Make entropy injection work across FileSystem SafetyNet.
---
 .../org/apache/flink/core/fs/EntropyInjector.java  | 50 ++++++++++++++++------
 .../apache/flink/core/fs/EntropyInjectorTest.java  | 38 ++++++++++++++++
 2 files changed, 74 insertions(+), 14 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
index 5bcb618..0fba138 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -23,6 +23,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -50,15 +52,14 @@ public class EntropyInjector {
 			Path path,
 			WriteMode writeMode) throws IOException {
 
-		if (!(fs instanceof EntropyInjectingFileSystem)) {
-			return new OutputStreamAndPath(fs.create(path, writeMode), path);
-		}
-
-		final EntropyInjectingFileSystem efs = (EntropyInjectingFileSystem) fs;
-		final Path pathWithEntropy = resolveEntropy(path, efs, true);
+		// check and possibly inject entropy into the path
+		final EntropyInjectingFileSystem efs = getEntropyFs(fs);
+		final Path processedPath = efs == null ? path : resolveEntropy(path, efs, true);
 
-		final FSDataOutputStream out = fs.create(pathWithEntropy, writeMode);
-		return new OutputStreamAndPath(out, pathWithEntropy);
+		// create the stream on the original file system to let the safety net
+		// take its effect
+		final FSDataOutputStream out = fs.create(processedPath, writeMode);
+		return new OutputStreamAndPath(out, processedPath);
 	}
 
 	/**
@@ -70,22 +71,43 @@ public class EntropyInjector {
 	 * @return The path without the marker string.
 	 */
 	public static Path removeEntropyMarkerIfPresent(FileSystem fs, Path path) {
-		if (fs instanceof EntropyInjectingFileSystem) {
+		final EntropyInjectingFileSystem efs = getEntropyFs(fs);
+		if (efs == null) {
+			return path;
+		}
+		else  {
 			try {
-				return resolveEntropy(path, (EntropyInjectingFileSystem) fs, false);
+				return resolveEntropy(path, efs, false);
 			}
 			catch (IOException e) {
-				// this should never happen, because the path was valid before. rethrow to silence the compiler
+				// this should never happen, because the path was valid before and we only remove characters.
+				// rethrow to silence the compiler
 				throw new FlinkRuntimeException(e.getMessage(), e);
 			}
 		}
-		else {
-			return path;
-		}
 	}
 
 	// ------------------------------------------------------------------------
 
+	@Nullable
+	private static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
+		if (fs instanceof EntropyInjectingFileSystem) {
+			return (EntropyInjectingFileSystem) fs;
+		}
+		else if (fs instanceof SafetyNetWrapperFileSystem) {
+			FileSystem delegate = ((SafetyNetWrapperFileSystem) fs).getWrappedDelegate();
+			if (delegate instanceof EntropyInjectingFileSystem) {
+				return (EntropyInjectingFileSystem) delegate;
+			}
+			else {
+				return null;
+			}
+		}
+		else {
+			return null;
+		}
+	}
+
 	@VisibleForTesting
 	static Path resolveEntropy(Path path, EntropyInjectingFileSystem efs, boolean injectEntropy) throws IOException {
 		final String entropyInjectionKey = efs.getEntropyInjectionKey();
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
index a425826..e7c8911 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
@@ -26,9 +26,11 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link EntropyInjector}.
@@ -122,6 +124,42 @@ public class EntropyInjectorTest {
 		assertTrue(new File (new File(folder, "test-entropy"), "file").exists());
 	}
 
+	@Test
+	public void testWithSafetyNet() throws Exception {
+		final String entropyKey = "__ekey__";
+		final String entropyValue = "abc";
+
+		final File folder = TMP_FOLDER.newFolder();
+
+		final Path path = new Path(Path.fromLocalFile(folder), entropyKey + "/path/");
+		final Path pathWithEntropy = new Path(Path.fromLocalFile(folder), entropyValue + "/path/");
+
+		TestEntropyInjectingFs efs = new TestEntropyInjectingFs(entropyKey, entropyValue);
+
+		FSDataOutputStream out;
+
+		FileSystemSafetyNet.initializeSafetyNetForThread();
+		FileSystem fs = FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(efs);
+		try  {
+			OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware(
+					fs, path, WriteMode.NO_OVERWRITE);
+
+			out = streamAndPath.stream();
+
+			assertEquals(pathWithEntropy, streamAndPath.path());
+		}
+		finally {
+			FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+		}
+
+		// check that the safety net closed the stream
+		try {
+			out.write(42);
+			out.flush();
+			fail("stream should be already close and hence fail with an exception");
+		} catch (IOException ignored) {}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class TestEntropyInjectingFs extends LocalFileSystem implements EntropyInjectingFileSystem {