You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/25 06:52:02 UTC

[flink] 04/06: [FLINK-9061] [checkpoints] FsStatebackend optionally injects entropy into state data file paths

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit badb72d6a5b3c1862a70caaa6ce5715cf4c0efee
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 24 20:45:48 2018 +0200

    [FLINK-9061] [checkpoints] FsStatebackend optionally injects entropy into state data file paths
---
 .../FsCheckpointMetadataOutputStream.java          |   3 +-
 .../state/filesystem/FsCheckpointStorage.java      |  17 ++-
 .../filesystem/FsCheckpointStorageLocation.java    |   6 +-
 .../filesystem/FsCheckpointStreamFactory.java      |  13 ++-
 .../filesystem/FsStateBackendEntropyTest.java      | 123 +++++++++++++++++++++
 5 files changed, 153 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
index f18d578..95f78b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointMetadataOutputStream.java
@@ -133,7 +133,8 @@ public final class FsCheckpointMetadataOutputStream extends CheckpointMetadataOu
 					FileStateHandle metaDataHandle = new FileStateHandle(metadataFilePath, size);
 
 					return new FsCompletedCheckpointStorageLocation(
-							fileSystem, exclusiveCheckpointDir, metaDataHandle, exclusiveCheckpointDir.toString());
+							fileSystem, exclusiveCheckpointDir, metaDataHandle,
+							metaDataHandle.getFilePath().getParent().toString());
 				}
 				catch (Exception e) {
 					try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index af80af7..1549e01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An implementation of durable checkpoint storage to file systems.
@@ -54,11 +55,25 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
 			JobID jobId,
 			int fileSizeThreshold) throws IOException {
 
+		this(checkpointBaseDirectory.getFileSystem(),
+				checkpointBaseDirectory,
+				defaultSavepointDirectory,
+				jobId,
+				fileSizeThreshold);
+	}
+
+	public FsCheckpointStorage(
+			FileSystem fs,
+			Path checkpointBaseDirectory,
+			@Nullable Path defaultSavepointDirectory,
+			JobID jobId,
+			int fileSizeThreshold) throws IOException {
+
 		super(jobId, defaultSavepointDirectory);
 
 		checkArgument(fileSizeThreshold >= 0);
 
-		this.fileSystem = checkpointBaseDirectory.getFileSystem();
+		this.fileSystem = checkNotNull(fs);
 		this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
 		this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
 		this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
index 5637b40..360ae2b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
@@ -67,7 +68,10 @@ public class FsCheckpointStorageLocation extends FsCheckpointStreamFactory imple
 		this.taskOwnedStateDirectory = checkNotNull(taskOwnedStateDir);
 		this.reference = checkNotNull(reference);
 
-		this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
+		// the metadata file should not have entropy in its path
+		Path metadataDir = EntropyInjector.removeEntropyMarkerIfPresent(fileSystem, checkpointDir);
+
+		this.metadataFilePath = new Path(metadataDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
 		this.fileStateSizeThreshold = fileStateSizeThreshold;
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 609ef69..665e7b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.OutputStreamAndPath;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -344,12 +347,10 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 			Exception latestException = null;
 			for (int attempt = 0; attempt < 10; attempt++) {
 				try {
-					Path statePath = createStatePath();
-					FSDataOutputStream outStream = fs.create(statePath, FileSystem.WriteMode.NO_OVERWRITE);
-
-					// success, managed to open the stream
-					this.statePath = statePath;
-					this.outStream = outStream;
+					OutputStreamAndPath streamAndPath = EntropyInjector.createEntropyAware(
+							fs, createStatePath(), WriteMode.NO_OVERWRITE);
+					this.outStream = streamAndPath.stream();
+					this.statePath = streamAndPath.path();
 					return;
 				}
 				catch (Exception e) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
new file mode 100644
index 0000000..12a4c42
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests verifying that the FsStateBackend passes the entropy injection option
+ * to the FileSystem for state payload files, but not for metadata files.
+ */
+public class FsStateBackendEntropyTest {
+
+	static final String ENTROPY_MARKER = "__ENTROPY__";
+	static final String RESOLVED_MARKER = "+RESOLVED+";
+
+	@Rule
+	public final TemporaryFolder tmp = new TemporaryFolder();
+
+	@Test
+	public void testEntropyInjection() throws Exception {
+		final FileSystem fs = new TestEntropyAwareFs();
+
+		final Path checkpointDir = new Path(Path.fromLocalFile(tmp.newFolder()), ENTROPY_MARKER + "/checkpoints");
+		final String checkpointDirStr = checkpointDir.toString();
+
+		FsCheckpointStorage storage = new FsCheckpointStorage(
+				fs, checkpointDir, null, new JobID(), 1024);
+
+		FsCheckpointStorageLocation location = (FsCheckpointStorageLocation)
+				storage.initializeLocationForCheckpoint(96562);
+
+		assertThat(location.getCheckpointDirectory().toString(), startsWith(checkpointDirStr));
+		assertThat(location.getSharedStateDirectory().toString(), startsWith(checkpointDirStr));
+		assertThat(location.getTaskOwnedStateDirectory().toString(), startsWith(checkpointDirStr));
+		assertThat(location.getMetadataFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+
+		// check entropy in task-owned state
+		try (CheckpointStateOutputStream stream = storage.createTaskOwnedStateStream()) {
+			stream.flush();
+			FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
+
+			assertNotNull(handle);
+			assertThat(handle.getFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+			assertThat(handle.getFilePath().toString(), containsString(RESOLVED_MARKER));
+		}
+
+		// check entropy in the exclusive/shared state
+		try (CheckpointStateOutputStream stream =
+				location.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) {
+
+			stream.flush();
+			FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
+
+			assertNotNull(handle);
+			assertThat(handle.getFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+			assertThat(handle.getFilePath().toString(), containsString(RESOLVED_MARKER));
+		}
+
+		// check that there is no entropy in the metadata
+		// check entropy in the exclusive/shared state
+		try (CheckpointMetadataOutputStream stream = location.createMetadataOutputStream()) {
+			stream.flush();
+			FsCompletedCheckpointStorageLocation handle =
+					(FsCompletedCheckpointStorageLocation) stream.closeAndFinalizeCheckpoint();
+
+			assertNotNull(handle);
+
+			// metadata files have no entropy
+			assertThat(handle.getMetadataHandle().getFilePath().toString(), not(containsString(ENTROPY_MARKER)));
+			assertThat(handle.getMetadataHandle().getFilePath().toString(), not(containsString(RESOLVED_MARKER)));
+
+			// external location is the same as metadata, without the file name
+			assertEquals(handle.getMetadataHandle().getFilePath().getParent().toString(), handle.getExternalPointer());
+		}
+	}
+
+	private static class TestEntropyAwareFs extends LocalFileSystem implements EntropyInjectingFileSystem {
+
+		@Override
+		public String getEntropyInjectionKey() {
+			return ENTROPY_MARKER;
+		}
+
+		@Override
+		public String generateEntropy() {
+			return RESOLVED_MARKER;
+		}
+	}
+}