You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:08 UTC

[flink] 04/14: [FLINK-5763][state backends] (follow-up) Pull scope and relative/absolut path decision out of FsCheckpointStateOutputStream

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5bd71c3c484517f138ffa771102eed28a062c47d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 15 14:58:58 2020 +0200

    [FLINK-5763][state backends] (follow-up) Pull scope and relative/absolut path decision out of FsCheckpointStateOutputStream
    
    The FsCheckpointStateOutputStream should not have to be aware of the notion of checkpoint
    state scopes (exclusive / shared), or which one supports which path types.
    
    (There is previous entropy handling code in the FsCheckpointStateOutputStream which arguably
    should also not be there).
---
 .../org/apache/flink/core/fs/EntropyInjector.java  |   4 +
 .../apache/flink/core/fs/EntropyInjectorTest.java  |   7 ++
 .../state/filesystem/FsCheckpointStorage.java      |   4 +-
 .../filesystem/FsCheckpointStreamFactory.java      |  41 ++++----
 .../FsCheckpointStateOutputStreamTest.java         |  33 +++---
 .../filesystem/FsCheckpointStreamFactoryTest.java  | 113 +++++++++++++++++++++
 .../filesystem/FsStateBackendEntropyTest.java      |   2 +-
 7 files changed, 163 insertions(+), 41 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
index b0cd9a6..1f67b34 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -89,6 +89,10 @@ public class EntropyInjector {
 
 	// ------------------------------------------------------------------------
 
+	public static boolean isEntropyInjecting(FileSystem fs) {
+		return getEntropyFs(fs) != null;
+	}
+
 	@Nullable
 	public static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
 		if (fs instanceof EntropyInjectingFileSystem) {
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
index 5c3d253..ce62cbc 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
@@ -188,6 +188,13 @@ public class EntropyInjectorTest {
 		}
 	}
 
+	@Test
+	public void testIsEntropyFs() {
+		final FileSystem efs = new TestEntropyInjectingFs("test", "ignored");
+
+		assertTrue(EntropyInjector.isEntropyInjecting(efs));
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class TestEntropyInjectingFs extends LocalFileSystem implements EntropyInjectingFileSystem {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index 143a077..a7bf331 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 
 import javax.annotation.Nullable;
@@ -176,8 +175,7 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
 				taskOwnedStateDirectory,
 				fileSystem,
 				writeBufferSize,
-				fileSizeThreshold,
-				CheckpointedStateScope.SHARED);
+				fileSizeThreshold);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 90a4faf..4a4db0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
-import org.apache.flink.core.fs.EntropyInjectingFileSystem;
 import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -85,6 +84,9 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 	/** Cached handle to the file system for file operations. */
 	private final FileSystem filesystem;
 
+	/** Whether the file system dynamically injects entropy into the file paths. */
+	private final boolean entropyInjecting;
+
 	/**
 	 * Creates a new stream factory that stores its checkpoint data in the file system and location
 	 * defined by the given Path.
@@ -124,6 +126,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		this.sharedStateDirectory = checkNotNull(sharedStateDirectory);
 		this.fileStateThreshold = fileStateSizeThreshold;
 		this.writeBufferSize = writeBufferSize;
+		this.entropyInjecting = EntropyInjector.isEntropyInjecting(fileSystem);
 	}
 
 	// ------------------------------------------------------------------------
@@ -133,7 +136,8 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
 		int bufferSize = Math.max(writeBufferSize, fileStateThreshold);
 
-		return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold, scope);
+		final boolean absolutePath = entropyInjecting || scope == CheckpointedStateScope.SHARED;
+		return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold, !absolutePath);
 	}
 
 	// ------------------------------------------------------------------------
@@ -153,7 +157,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 	 * A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and
 	 * returns a {@link StreamStateHandle} upon closing.
 	 */
-	public static final class FsCheckpointStateOutputStream
+	public static class FsCheckpointStateOutputStream
 			extends CheckpointStreamFactory.CheckpointStateOutputStream {
 
 		private final byte[] writeBuffer;
@@ -174,14 +178,22 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
 		private volatile boolean closed;
 
-		private final CheckpointedStateScope scope;
+		private final boolean allowRelativePaths;
+
+		public FsCheckpointStateOutputStream(
+				Path basePath,
+				FileSystem fs,
+				int bufferSize,
+				int localStateThreshold) {
+			this(basePath, fs, bufferSize, localStateThreshold, false);
+		}
 
 		public FsCheckpointStateOutputStream(
 					Path basePath,
 					FileSystem fs,
 					int bufferSize,
 					int localStateThreshold,
-					CheckpointedStateScope scope) {
+					boolean allowRelativePaths) {
 
 			if (bufferSize < localStateThreshold) {
 				throw new IllegalArgumentException();
@@ -191,7 +203,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 			this.fs = fs;
 			this.writeBuffer = new byte[bufferSize];
 			this.localStateThreshold = localStateThreshold;
-			this.scope = scope;
+			this.allowRelativePaths = allowRelativePaths;
 		}
 
 		@Override
@@ -328,20 +340,9 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
 							outStream.close();
 
-							if (CheckpointedStateScope.EXCLUSIVE.equals(scope)) {
-								EntropyInjectingFileSystem efs = EntropyInjector.getEntropyFs(fs);
-								// currently, do not use relative state handle for entropy file system
-								if (efs != null) {
-									return new FileStateHandle(statePath, size);
-								} else {
-									return new RelativeFileStateHandle(
-										statePath,
-										relativeStatePath,
-										size);
-								}
-							} else {
-								return new FileStateHandle(statePath, size);
-							}
+							return allowRelativePaths
+									? new RelativeFileStateHandle(statePath, relativeStatePath, size)
+									: new FileStateHandle(statePath, size);
 						} catch (Exception exception) {
 							try {
 								if (statePath != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index d3c9d15..d713e44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -70,13 +69,13 @@ import static org.mockito.Mockito.when;
 @RunWith(Parameterized.class)
 public class FsCheckpointStateOutputStreamTest {
 
-	@Parameterized.Parameters(name = "scope = {0}")
-	public static List<CheckpointedStateScope> parameters() {
-		return Arrays.asList(CheckpointedStateScope.values());
+	@Parameterized.Parameters(name = "relativePaths = {0}")
+	public static List<Boolean> parameters() {
+		return Arrays.asList(true, false);
 	}
 
 	@Parameterized.Parameter
-	public CheckpointedStateScope scope;
+	public boolean relativePaths;
 
 	@Rule
 	public final TemporaryFolder tempDir = new TemporaryFolder();
@@ -85,17 +84,17 @@ public class FsCheckpointStateOutputStreamTest {
 	public void testWrongParameters() throws Exception {
 		// this should fail
 		new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-			Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 4000, 5000, scope);
+			Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 4000, 5000, relativePaths);
 	}
 
 	@Test
 	public void testEmptyState() throws Exception {
 		FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
 				new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, scope);
+						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, relativePaths);
 
 		StreamStateHandle handle = stream.closeAndGetHandle();
-		assertTrue(handle == null);
+		assertNull(handle);
 	}
 
 	@Test
@@ -122,7 +121,7 @@ public class FsCheckpointStateOutputStreamTest {
 	public void testGetPos() throws Exception {
 		FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
 				new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, scope);
+						Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, relativePaths);
 
 		for (int i = 0; i < 64; ++i) {
 			Assert.assertEquals(i, stream.getPos());
@@ -134,7 +133,7 @@ public class FsCheckpointStateOutputStreamTest {
 		// ----------------------------------------------------
 
 		stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, scope);
+				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, relativePaths);
 
 		byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
 
@@ -165,7 +164,7 @@ public class FsCheckpointStateOutputStreamTest {
 			fs,
 			4,
 			0,
-			scope);
+			relativePaths);
 
 		// this should create the underlying file stream
 		stream.write(new byte[] {1, 2, 3, 4, 5});
@@ -195,7 +194,7 @@ public class FsCheckpointStateOutputStreamTest {
 			fs,
 			4,
 			0,
-			scope);
+			relativePaths);
 
 		// this should create the underlying file stream
 		stream.write(new byte[] {1, 2, 3, 4, 5});
@@ -215,7 +214,7 @@ public class FsCheckpointStateOutputStreamTest {
 	private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
 		FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
 			new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
-					Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), bufferSize, threshold, scope);
+					Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), bufferSize, threshold, relativePaths);
 
 		Random rnd = new Random();
 		byte[] original = new byte[numBytes];
@@ -264,7 +263,7 @@ public class FsCheckpointStateOutputStreamTest {
 	@Test
 	public void testWriteFailsFastWhenClosed() throws Exception {
 		FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
-				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, scope);
+				Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, relativePaths);
 
 		assertFalse(stream.isClosed());
 
@@ -303,7 +302,7 @@ public class FsCheckpointStateOutputStreamTest {
 		final Path basePath = Path.fromLocalFile(directory);
 
 		final Supplier<CheckpointStateOutputStream> factory = () ->
-				new FsCheckpointStateOutputStream(basePath, FileSystem.getLocalFileSystem(), 1024, 15, scope);
+				new FsCheckpointStateOutputStream(basePath, FileSystem.getLocalFileSystem(), 1024, 15, relativePaths);
 
 		CheckpointStateOutputStream stream1 = factory.get();
 		CheckpointStateOutputStream stream2 = factory.get();
@@ -372,10 +371,10 @@ public class FsCheckpointStateOutputStreamTest {
 		FileSystem fs = spy(FileSystem.getLocalFileSystem());
 
 		FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
-				Path.fromLocalFile(directory), fs, 1024, 1, scope);
+				Path.fromLocalFile(directory), fs, 1024, 1, relativePaths);
 
 		FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
-				Path.fromLocalFile(directory), fs, 1024, 1, scope);
+				Path.fromLocalFile(directory), fs, 1024, 1, relativePaths);
 
 		stream1.write(new byte[61]);
 		stream2.write(new byte[61]);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
new file mode 100644
index 0000000..1fa5cc0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for the {@link FsCheckpointStreamFactory}.
+ */
+public class FsCheckpointStreamFactoryTest {
+
+	@Rule
+	public final TemporaryFolder TMP = new TemporaryFolder();
+
+	private Path exclusiveStateDir;
+	private Path sharedStateDir;
+
+	@Before
+	public void createStateDirectories() throws IOException {
+		exclusiveStateDir = Path.fromLocalFile(TMP.newFolder("exclusive"));
+		sharedStateDir = Path.fromLocalFile(TMP.newFolder("shared"));
+	}
+
+	// ------------------------------------------------------------------------
+	//  tests
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testExclusiveStateHasRelativePathHandles() throws IOException {
+		final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem());
+
+		final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+				factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+		stream.write(1657);
+		final StreamStateHandle handle = stream.closeAndGetHandle();
+
+		assertThat(handle, instanceOf(RelativeFileStateHandle.class));
+		assertPathsEqual(exclusiveStateDir, ((RelativeFileStateHandle) handle).getFilePath().getParent());
+	}
+
+	@Test
+	public void testSharedStateHasAbsolutePathHandles() throws IOException {
+		final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem());
+
+		final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+			factory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
+		stream.write(0);
+		final StreamStateHandle handle = stream.closeAndGetHandle();
+
+		assertThat(handle, instanceOf(FileStateHandle.class));
+		assertThat(handle, not(instanceOf(RelativeFileStateHandle.class)));
+		assertPathsEqual(sharedStateDir, ((FileStateHandle) handle).getFilePath().getParent());
+	}
+
+	@Test
+	public void testEntropyMakesExclusiveStateAbsolutePaths() throws IOException{
+		final FsCheckpointStreamFactory factory = createFactory(new FsStateBackendEntropyTest.TestEntropyAwareFs());
+
+		final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+			factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+		stream.write(0);
+		final StreamStateHandle handle = stream.closeAndGetHandle();
+
+		assertThat(handle, instanceOf(FileStateHandle.class));
+		assertThat(handle, not(instanceOf(RelativeFileStateHandle.class)));
+		assertPathsEqual(exclusiveStateDir, ((FileStateHandle) handle).getFilePath().getParent());
+	}
+
+	// ------------------------------------------------------------------------
+	//  test utils
+	// ------------------------------------------------------------------------
+
+	private static void assertPathsEqual(Path expected, Path actual) {
+		final Path reNormalizedExpected = new Path(expected.toString());
+		final Path reNormalizedActual = new Path(actual.toString());
+		assertEquals(reNormalizedExpected, reNormalizedActual);
+	}
+
+	private FsCheckpointStreamFactory createFactory(FileSystem fs) {
+		return new FsCheckpointStreamFactory(fs, exclusiveStateDir, sharedStateDir, 0, 4096);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
index 47955b1..72ed803 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -109,7 +109,7 @@ public class FsStateBackendEntropyTest {
 		}
 	}
 
-	private static class TestEntropyAwareFs extends LocalFileSystem implements EntropyInjectingFileSystem {
+	static class TestEntropyAwareFs extends LocalFileSystem implements EntropyInjectingFileSystem {
 
 		@Override
 		public String getEntropyInjectionKey() {