You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/24 21:56:51 UTC
[flink] 03/05: [FLINK-9061] [checkpoints] FsStatebackend optionally
injects entropy into state data file paths
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fc7b9ac7172afcad9c928cbb0d39da49a89b9838
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 20:45:48 2018 +0200
[FLINK-9061] [checkpoints] FsStatebackend optionally injects entropy into state data file paths
---
.../FsCheckpointMetadataOutputStream.java | 3 +-
.../state/filesystem/FsCheckpointStorage.java | 17 ++-
.../filesystem/FsCheckpointStorageLocation.java | 6 +-
.../filesystem/FsCheckpointStreamFactory.java | 13 ++-
.../filesystem/FsStateBackendEntropyTest.java | 123 +++++++++++++++++++++
5 files changed, 153 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
index f18d578..95f78b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
@@ -133,7 +133,8 @@ public final class FsCheckpointMetadataOutputStream extends CheckpointMetadataOu
FileStateHandle metaDataHandle = new FileStateHandle(metadataFilePath, size);
return new FsCompletedCheckpointStorageLocation(
- fileSystem, exclusiveCheckpointDir, metaDataHandle, exclusiveCheckpointDir.toString());
+ fileSystem, exclusiveCheckpointDir, metaDataHandle,
+ metaDataHandle.getFilePath().getParent().toString());
}
catch (Exception e) {
try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index af80af7..1549e01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* An implementation of durable checkpoint storage to file systems.
@@ -54,11 +55,25 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
JobID jobId,
int fileSizeThreshold) throws IOException {
+ this(checkpointBaseDirectory.getFileSystem(),
+ checkpointBaseDirectory,
+ defaultSavepointDirectory,
+ jobId,
+ fileSizeThreshold);
+ }
+
+ public FsCheckpointStorage(
+ FileSystem fs,
+ Path checkpointBaseDirectory,
+ @Nullable Path defaultSavepointDirectory,
+ JobID jobId,
+ int fileSizeThreshold) throws IOException {
+
super(jobId, defaultSavepointDirectory);
checkArgument(fileSizeThreshold >= 0);
- this.fileSystem = checkpointBaseDirectory.getFileSystem();
+ this.fileSystem = checkNotNull(fs);
this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
index 5637b40..360ae2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
@@ -67,7 +68,10 @@ public class FsCheckpointStorageLocation extends FsCheckpointStreamFactory imple
this.taskOwnedStateDirectory = checkNotNull(taskOwnedStateDir);
this.reference = checkNotNull(reference);
- this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
+ // the metadata file should not have entropy in its path
+ Path metadataDir = EntropyInjector.removeEntropyMarkerIfPresent(fileSystem, checkpointDir);
+
+ this.metadataFilePath = new Path(metadataDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
this.fileStateSizeThreshold = fileStateSizeThreshold;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 609ef69..665e7b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,8 +18,11 @@
package org.apache.flink.runtime.state.filesystem;
+import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.OutputStreamAndPath;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -344,12 +347,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
Exception latestException = null;
for (int attempt = 0; attempt < 10; attempt++) {
try {
- Path statePath = createStatePath();
- FSDataOutputStream outStream = fs.create(statePath, FileSystem.WriteMode.NO_OVERWRITE);
-
- // success, managed to open the stream
- this.statePath = statePath;
- this.outStream = outStream;
+ OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware(
+ fs, createStatePath(), WriteMode.NO_OVERWRITE);
+ this.outStream = streamAndPath.stream();
+ this.statePath = streamAndPath.path();
return;
}
catch (Exception e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
new file mode 100644
index 0000000..12a4c42
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests verifying that the FsStateBackend passes the entropy injection option
+ * to the FileSystem for state payload files, but not for metadata files.
+ */
+public class FsStateBackendEntropyTest {
+
+ static final String ENTROPY_MARKER = "__ENTROPY__";
+ static final String RESOLVED_MARKER = "+RESOLVED+";
+
+ @Rule
+ public final TemporaryFolder tmp = new TemporaryFolder();
+
+ @Test
+ public void testEntropyInjection() throws Exception {
+ final FileSystem fs = new TestEntropyAwareFs();
+
+ final Path checkpointDir = new Path(Path.fromLocalFile(tmp.newFolder()), ENTROPY_MARKER + "/checkpoints");
+ final String checkpointDirStr = checkpointDir.toString();
+
+ FsCheckpointStorage storage = new FsCheckpointStorage(
+ fs, checkpointDir, null, new JobID(), 1024);
+
+ FsCheckpointStorageLocation location = (FsCheckpointStorageLocation)
+ storage.initializeLocationForCheckpoint(96562);
+
+ assertThat(location.getCheckpointDirectory().toString(), startsWith(checkpointDirStr));
+ assertThat(location.getSharedStateDirectory().toString(), startsWith(checkpointDirStr));
+ assertThat(location.getTaskOwnedStateDirectory().toString(), startsWith(checkpointDirStr));
+ assertThat(location.getMetadataFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+
+ // check entropy in task-owned state
+ try (CheckpointStateOutputStream stream = storage.createTaskOwnedStateStream()) {
+ stream.flush();
+ FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
+
+ assertNotNull(handle);
+ assertThat(handle.getFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+ assertThat(handle.getFilePath().toString(), containsString(RESOLVED_MARKER));
+ }
+
+ // check entropy in the exclusive/shared state
+ try (CheckpointStateOutputStream stream =
+ location.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) {
+
+ stream.flush();
+ FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
+
+ assertNotNull(handle);
+ assertThat(handle.getFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+ assertThat(handle.getFilePath().toString(), containsString(RESOLVED_MARKER));
+ }
+
+ // check that there is no entropy in the metadata
+ // check entropy in the exclusive/shared state
+ try (CheckpointMetadataOutputStream stream = location.createMetadataOutputStream()) {
+ stream.flush();
+ FsCompletedCheckpointStorageLocation handle =
+ (FsCompletedCheckpointStorageLocation) stream.closeAndFinalizeCheckpoint();
+
+ assertNotNull(handle);
+
+ // metadata files have no entropy
+ assertThat(handle.getMetadataHandle().getFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+ assertThat(handle.getMetadataHandle().getFilePath().toString(), not(containsString(RESOLVED_MARKER)));
+
+ // external location is the same as metadata, without the file name
+ assertEquals(handle.getMetadataHandle().getFilePath().getParent().toString(), handle.getExternalPointer());
+ }
+ }
+
+ private static class TestEntropyAwareFs extends LocalFileSystem implements EntropyInjectingFileSystem {
+
+ @Override
+ public String getEntropyInjectionKey() {
+ return ENTROPY_MARKER;
+ }
+
+ @Override
+ public String generateEntropy() {
+ return RESOLVED_MARKER;
+ }
+ }
+}