You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/26 20:03:23 UTC
[flink] branch master updated: [FLINK-10444] [core] Make entropy
injection work across FileSystem SafetyNet.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cc9b376 [FLINK-10444] [core] Make entropy injection work across FileSystem SafetyNet.
cc9b376 is described below
commit cc9b3769084634dc660f7a90aed0090cb46b3342
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Sep 26 18:26:18 2018 +0200
[FLINK-10444] [core] Make entropy injection work across FileSystem SafetyNet.
---
.../org/apache/flink/core/fs/EntropyInjector.java | 50 ++++++++++++++++------
.../apache/flink/core/fs/EntropyInjectorTest.java | 38 ++++++++++++++++
2 files changed, 74 insertions(+), 14 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
index 5bcb618..0fba138 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -23,6 +23,8 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.FlinkRuntimeException;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -50,15 +52,14 @@ public class EntropyInjector {
Path path,
WriteMode writeMode) throws IOException {
- if (!(fs instanceof EntropyInjectingFileSystem)) {
- return new OutputStreamAndPath(fs.create(path, writeMode), path);
- }
-
- final EntropyInjectingFileSystem efs = (EntropyInjectingFileSystem) fs;
- final Path pathWithEntropy = resolveEntropy(path, efs, true);
+ // check and possibly inject entropy into the path
+ final EntropyInjectingFileSystem efs = getEntropyFs(fs);
+ final Path processedPath = efs == null ? path : resolveEntropy(path, efs, true);
- final FSDataOutputStream out = fs.create(pathWithEntropy, writeMode);
- return new OutputStreamAndPath(out, pathWithEntropy);
+ // create the stream on the original file system to let the safety net
+ // take its effect
+ final FSDataOutputStream out = fs.create(processedPath, writeMode);
+ return new OutputStreamAndPath(out, processedPath);
}
/**
@@ -70,22 +71,43 @@ public class EntropyInjector {
* @return The path without the marker string.
*/
public static Path removeEntropyMarkerIfPresent(FileSystem fs, Path path) {
- if (fs instanceof EntropyInjectingFileSystem) {
+ final EntropyInjectingFileSystem efs = getEntropyFs(fs);
+ if (efs == null) {
+ return path;
+ }
+ else {
try {
- return resolveEntropy(path, (EntropyInjectingFileSystem) fs, false);
+ return resolveEntropy(path, efs, false);
}
catch (IOException e) {
- // this should never happen, because the path was valid before. rethrow to silence the compiler
+ // this should never happen, because the path was valid before and we only remove characters.
+ // rethrow to silence the compiler
throw new FlinkRuntimeException(e.getMessage(), e);
}
}
- else {
- return path;
- }
}
// ------------------------------------------------------------------------
+ @Nullable
+ private static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
+ if (fs instanceof EntropyInjectingFileSystem) {
+ return (EntropyInjectingFileSystem) fs;
+ }
+ else if (fs instanceof SafetyNetWrapperFileSystem) {
+ FileSystem delegate = ((SafetyNetWrapperFileSystem) fs).getWrappedDelegate();
+ if (delegate instanceof EntropyInjectingFileSystem) {
+ return (EntropyInjectingFileSystem) delegate;
+ }
+ else {
+ return null;
+ }
+ }
+ else {
+ return null;
+ }
+ }
+
@VisibleForTesting
static Path resolveEntropy(Path path, EntropyInjectingFileSystem efs, boolean injectEntropy) throws IOException {
final String entropyInjectionKey = efs.getEntropyInjectionKey();
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
index a425826..e7c8911 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
@@ -26,9 +26,11 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
+import java.io.IOException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for the {@link EntropyInjector}.
@@ -122,6 +124,42 @@ public class EntropyInjectorTest {
assertTrue(new File (new File(folder, "test-entropy"), "file").exists());
}
+ @Test
+ public void testWithSafetyNet() throws Exception {
+ final String entropyKey = "__ekey__";
+ final String entropyValue = "abc";
+
+ final File folder = TMP_FOLDER.newFolder();
+
+ final Path path = new Path(Path.fromLocalFile(folder), entropyKey + "/path/");
+ final Path pathWithEntropy = new Path(Path.fromLocalFile(folder), entropyValue + "/path/");
+
+ TestEntropyInjectingFs efs = new TestEntropyInjectingFs(entropyKey, entropyValue);
+
+ FSDataOutputStream out;
+
+ FileSystemSafetyNet.initializeSafetyNetForThread();
+ FileSystem fs = FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(efs);
+ try {
+ OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware(
+ fs, path, WriteMode.NO_OVERWRITE);
+
+ out = streamAndPath.stream();
+
+ assertEquals(pathWithEntropy, streamAndPath.path());
+ }
+ finally {
+ FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+ }
+
+ // check that the safety net closed the stream
+ try {
+ out.write(42);
+ out.flush();
+ fail("stream should be already close and hence fail with an exception");
+ } catch (IOException ignored) {}
+ }
+
// ------------------------------------------------------------------------
private static final class TestEntropyInjectingFs extends LocalFileSystem implements EntropyInjectingFileSystem {