You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 15:25:08 UTC
[flink] 04/14: [FLINK-5763][state backends] (follow-up) Pull scope
and relative/absolut path decision out of FsCheckpointStateOutputStream
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5bd71c3c484517f138ffa771102eed28a062c47d
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 15 14:58:58 2020 +0200
[FLINK-5763][state backends] (follow-up) Pull scope and relative/absolut path decision out of FsCheckpointStateOutputStream
The FsCheckpointStateOutputStream should not have to be aware of the notion of checkpoint
state scopes (exclusive / shared), or which one supports which path types.
(There is previous entropy handling code in the FsCheckpointStateOutputStream which arguably
should also not be there).
---
.../org/apache/flink/core/fs/EntropyInjector.java | 4 +
.../apache/flink/core/fs/EntropyInjectorTest.java | 7 ++
.../state/filesystem/FsCheckpointStorage.java | 4 +-
.../filesystem/FsCheckpointStreamFactory.java | 41 ++++----
.../FsCheckpointStateOutputStreamTest.java | 33 +++---
.../filesystem/FsCheckpointStreamFactoryTest.java | 113 +++++++++++++++++++++
.../filesystem/FsStateBackendEntropyTest.java | 2 +-
7 files changed, 163 insertions(+), 41 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
index b0cd9a6..1f67b34 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java
@@ -89,6 +89,10 @@ public class EntropyInjector {
// ------------------------------------------------------------------------
+ public static boolean isEntropyInjecting(FileSystem fs) {
+ return getEntropyFs(fs) != null;
+ }
+
@Nullable
public static EntropyInjectingFileSystem getEntropyFs(FileSystem fs) {
if (fs instanceof EntropyInjectingFileSystem) {
diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
index 5c3d253..ce62cbc 100644
--- a/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/fs/EntropyInjectorTest.java
@@ -188,6 +188,13 @@ public class EntropyInjectorTest {
}
}
+ @Test
+ public void testIsEntropyFs() {
+ final FileSystem efs = new TestEntropyInjectingFs("test", "ignored");
+
+ assertTrue(EntropyInjector.isEntropyInjecting(efs));
+ }
+
// ------------------------------------------------------------------------
private static final class TestEntropyInjectingFs extends LocalFileSystem implements EntropyInjectingFileSystem {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index 143a077..a7bf331 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
import javax.annotation.Nullable;
@@ -176,8 +175,7 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
taskOwnedStateDirectory,
fileSystem,
writeBufferSize,
- fileSizeThreshold,
- CheckpointedStateScope.SHARED);
+ fileSizeThreshold);
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 90a4faf..4a4db0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.state.filesystem;
-import org.apache.flink.core.fs.EntropyInjectingFileSystem;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
@@ -85,6 +84,9 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
/** Cached handle to the file system for file operations. */
private final FileSystem filesystem;
+ /** Whether the file system dynamically injects entropy into the file paths. */
+ private final boolean entropyInjecting;
+
/**
* Creates a new stream factory that stores its checkpoint data in the file system and location
* defined by the given Path.
@@ -124,6 +126,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
this.sharedStateDirectory = checkNotNull(sharedStateDirectory);
this.fileStateThreshold = fileStateSizeThreshold;
this.writeBufferSize = writeBufferSize;
+ this.entropyInjecting = EntropyInjector.isEntropyInjecting(fileSystem);
}
// ------------------------------------------------------------------------
@@ -133,7 +136,8 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
Path target = scope == CheckpointedStateScope.EXCLUSIVE ? checkpointDirectory : sharedStateDirectory;
int bufferSize = Math.max(writeBufferSize, fileStateThreshold);
- return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold, scope);
+ final boolean absolutePath = entropyInjecting || scope == CheckpointedStateScope.SHARED;
+ return new FsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold, !absolutePath);
}
// ------------------------------------------------------------------------
@@ -153,7 +157,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
* A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and
* returns a {@link StreamStateHandle} upon closing.
*/
- public static final class FsCheckpointStateOutputStream
+ public static class FsCheckpointStateOutputStream
extends CheckpointStreamFactory.CheckpointStateOutputStream {
private final byte[] writeBuffer;
@@ -174,14 +178,22 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
private volatile boolean closed;
- private final CheckpointedStateScope scope;
+ private final boolean allowRelativePaths;
+
+ public FsCheckpointStateOutputStream(
+ Path basePath,
+ FileSystem fs,
+ int bufferSize,
+ int localStateThreshold) {
+ this(basePath, fs, bufferSize, localStateThreshold, false);
+ }
public FsCheckpointStateOutputStream(
Path basePath,
FileSystem fs,
int bufferSize,
int localStateThreshold,
- CheckpointedStateScope scope) {
+ boolean allowRelativePaths) {
if (bufferSize < localStateThreshold) {
throw new IllegalArgumentException();
@@ -191,7 +203,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
this.fs = fs;
this.writeBuffer = new byte[bufferSize];
this.localStateThreshold = localStateThreshold;
- this.scope = scope;
+ this.allowRelativePaths = allowRelativePaths;
}
@Override
@@ -328,20 +340,9 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
outStream.close();
- if (CheckpointedStateScope.EXCLUSIVE.equals(scope)) {
- EntropyInjectingFileSystem efs = EntropyInjector.getEntropyFs(fs);
- // currently, do not use relative state handle for entropy file system
- if (efs != null) {
- return new FileStateHandle(statePath, size);
- } else {
- return new RelativeFileStateHandle(
- statePath,
- relativeStatePath,
- size);
- }
- } else {
- return new FileStateHandle(statePath, size);
- }
+ return allowRelativePaths
+ ? new RelativeFileStateHandle(statePath, relativeStatePath, size)
+ : new FileStateHandle(statePath, size);
} catch (Exception exception) {
try {
if (statePath != null) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index d3c9d15..d713e44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -70,13 +69,13 @@ import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class FsCheckpointStateOutputStreamTest {
- @Parameterized.Parameters(name = "scope = {0}")
- public static List<CheckpointedStateScope> parameters() {
- return Arrays.asList(CheckpointedStateScope.values());
+ @Parameterized.Parameters(name = "relativePaths = {0}")
+ public static List<Boolean> parameters() {
+ return Arrays.asList(true, false);
}
@Parameterized.Parameter
- public CheckpointedStateScope scope;
+ public boolean relativePaths;
@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();
@@ -85,17 +84,17 @@ public class FsCheckpointStateOutputStreamTest {
public void testWrongParameters() throws Exception {
// this should fail
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
- Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 4000, 5000, scope);
+ Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 4000, 5000, relativePaths);
}
@Test
public void testEmptyState() throws Exception {
FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
- Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, scope);
+ Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, relativePaths);
StreamStateHandle handle = stream.closeAndGetHandle();
- assertTrue(handle == null);
+ assertNull(handle);
}
@Test
@@ -122,7 +121,7 @@ public class FsCheckpointStateOutputStreamTest {
public void testGetPos() throws Exception {
FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
- Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, scope);
+ Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, relativePaths);
for (int i = 0; i < 64; ++i) {
Assert.assertEquals(i, stream.getPos());
@@ -134,7 +133,7 @@ public class FsCheckpointStateOutputStreamTest {
// ----------------------------------------------------
stream = new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
- Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, scope);
+ Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 31, 17, relativePaths);
byte[] data = "testme!".getBytes(ConfigConstants.DEFAULT_CHARSET);
@@ -165,7 +164,7 @@ public class FsCheckpointStateOutputStreamTest {
fs,
4,
0,
- scope);
+ relativePaths);
// this should create the underlying file stream
stream.write(new byte[] {1, 2, 3, 4, 5});
@@ -195,7 +194,7 @@ public class FsCheckpointStateOutputStreamTest {
fs,
4,
0,
- scope);
+ relativePaths);
// this should create the underlying file stream
stream.write(new byte[] {1, 2, 3, 4, 5});
@@ -215,7 +214,7 @@ public class FsCheckpointStateOutputStreamTest {
private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception {
FsCheckpointStreamFactory.CheckpointStateOutputStream stream =
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
- Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), bufferSize, threshold, scope);
+ Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), bufferSize, threshold, relativePaths);
Random rnd = new Random();
byte[] original = new byte[numBytes];
@@ -264,7 +263,7 @@ public class FsCheckpointStateOutputStreamTest {
@Test
public void testWriteFailsFastWhenClosed() throws Exception {
FsCheckpointStateOutputStream stream = new FsCheckpointStateOutputStream(
- Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, scope);
+ Path.fromLocalFile(tempDir.newFolder()), FileSystem.getLocalFileSystem(), 1024, 512, relativePaths);
assertFalse(stream.isClosed());
@@ -303,7 +302,7 @@ public class FsCheckpointStateOutputStreamTest {
final Path basePath = Path.fromLocalFile(directory);
final Supplier<CheckpointStateOutputStream> factory = () ->
- new FsCheckpointStateOutputStream(basePath, FileSystem.getLocalFileSystem(), 1024, 15, scope);
+ new FsCheckpointStateOutputStream(basePath, FileSystem.getLocalFileSystem(), 1024, 15, relativePaths);
CheckpointStateOutputStream stream1 = factory.get();
CheckpointStateOutputStream stream2 = factory.get();
@@ -372,10 +371,10 @@ public class FsCheckpointStateOutputStreamTest {
FileSystem fs = spy(FileSystem.getLocalFileSystem());
FsCheckpointStateOutputStream stream1 = new FsCheckpointStateOutputStream(
- Path.fromLocalFile(directory), fs, 1024, 1, scope);
+ Path.fromLocalFile(directory), fs, 1024, 1, relativePaths);
FsCheckpointStateOutputStream stream2 = new FsCheckpointStateOutputStream(
- Path.fromLocalFile(directory), fs, 1024, 1, scope);
+ Path.fromLocalFile(directory), fs, 1024, 1, relativePaths);
stream1.write(new byte[61]);
stream2.write(new byte[61]);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
new file mode 100644
index 0000000..1fa5cc0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactoryTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Unit tests for the {@link FsCheckpointStreamFactory}.
+ */
+public class FsCheckpointStreamFactoryTest {
+
+ @Rule
+ public final TemporaryFolder TMP = new TemporaryFolder();
+
+ private Path exclusiveStateDir;
+ private Path sharedStateDir;
+
+ @Before
+ public void createStateDirectories() throws IOException {
+ exclusiveStateDir = Path.fromLocalFile(TMP.newFolder("exclusive"));
+ sharedStateDir = Path.fromLocalFile(TMP.newFolder("shared"));
+ }
+
+ // ------------------------------------------------------------------------
+ // tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testExclusiveStateHasRelativePathHandles() throws IOException {
+ final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem());
+
+ final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+ factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+ stream.write(1657);
+ final StreamStateHandle handle = stream.closeAndGetHandle();
+
+ assertThat(handle, instanceOf(RelativeFileStateHandle.class));
+ assertPathsEqual(exclusiveStateDir, ((RelativeFileStateHandle) handle).getFilePath().getParent());
+ }
+
+ @Test
+ public void testSharedStateHasAbsolutePathHandles() throws IOException {
+ final FsCheckpointStreamFactory factory = createFactory(FileSystem.getLocalFileSystem());
+
+ final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+ factory.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
+ stream.write(0);
+ final StreamStateHandle handle = stream.closeAndGetHandle();
+
+ assertThat(handle, instanceOf(FileStateHandle.class));
+ assertThat(handle, not(instanceOf(RelativeFileStateHandle.class)));
+ assertPathsEqual(sharedStateDir, ((FileStateHandle) handle).getFilePath().getParent());
+ }
+
+ @Test
+ public void testEntropyMakesExclusiveStateAbsolutePaths() throws IOException{
+ final FsCheckpointStreamFactory factory = createFactory(new FsStateBackendEntropyTest.TestEntropyAwareFs());
+
+ final FsCheckpointStreamFactory.FsCheckpointStateOutputStream stream =
+ factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+ stream.write(0);
+ final StreamStateHandle handle = stream.closeAndGetHandle();
+
+ assertThat(handle, instanceOf(FileStateHandle.class));
+ assertThat(handle, not(instanceOf(RelativeFileStateHandle.class)));
+ assertPathsEqual(exclusiveStateDir, ((FileStateHandle) handle).getFilePath().getParent());
+ }
+
+ // ------------------------------------------------------------------------
+ // test utils
+ // ------------------------------------------------------------------------
+
+ private static void assertPathsEqual(Path expected, Path actual) {
+ final Path reNormalizedExpected = new Path(expected.toString());
+ final Path reNormalizedActual = new Path(actual.toString());
+ assertEquals(reNormalizedExpected, reNormalizedActual);
+ }
+
+ private FsCheckpointStreamFactory createFactory(FileSystem fs) {
+ return new FsCheckpointStreamFactory(fs, exclusiveStateDir, sharedStateDir, 0, 4096);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
index 47955b1..72ed803 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -109,7 +109,7 @@ public class FsStateBackendEntropyTest {
}
}
- private static class TestEntropyAwareFs extends LocalFileSystem implements EntropyInjectingFileSystem {
+ static class TestEntropyAwareFs extends LocalFileSystem implements EntropyInjectingFileSystem {
@Override
public String getEntropyInjectionKey() {