You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 05:52:03 UTC

[07/10] flink git commit: [FLINK-9633] Use Savepoint path's file system to create checkpoint output stream

[FLINK-9633] Use Savepoint path's file system to create checkpoint output stream

This commit changes Flink such that it uses the savepoint path's file system to
generate the output stream instead of the checkpoint path's file system.

This closes #6194.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64bc4b34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64bc4b34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64bc4b34

Branch: refs/heads/master
Commit: 64bc4b348ddd1f0c63e806442ffd3aad5c367a28
Parents: b230bf0
Author: sihuazhou <su...@163.com>
Authored: Thu Jun 21 17:42:54 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 07:51:28 2018 +0200

----------------------------------------------------------------------
 .../state/filesystem/FsCheckpointStorage.java   |  2 +-
 .../filesystem/FsCheckpointStorageLocation.java |  6 +++
 .../filesystem/FsCheckpointStorageTest.java     | 45 ++++++++++++++++++++
 3 files changed, 52 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64bc4b34/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index e148dfb..af80af7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -126,7 +126,7 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
 			final Path path = decodePathFromReference(reference);
 
 			return new FsCheckpointStorageLocation(
-					fileSystem,
+					path.getFileSystem(),
 					path,
 					path,
 					path,

http://git-wip-us.apache.org/repos/asf/flink/blob/64bc4b34/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
index 469507a..5637b40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
@@ -127,4 +128,9 @@ public class FsCheckpointStorageLocation extends FsCheckpointStreamFactory imple
 				", fileStateSizeThreshold=" + fileStateSizeThreshold +
 				'}';
 	}
+
+	@VisibleForTesting
+	FileSystem getFileSystem() {
+		return fileSystem;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/64bc4b34/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
index e391a5d..1a9cf6c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageTest.java
@@ -31,7 +31,10 @@ import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsChe
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.File;
+import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.Arrays;
@@ -41,6 +44,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the {@link FsCheckpointStorage}, which implements the checkpoint storage
@@ -204,6 +208,28 @@ public class FsCheckpointStorageTest extends AbstractFileCheckpointStorageTestBa
 		assertFalse(checkpointDir.exists());
 	}
 
+	@Test
+	public void testResolveCheckpointStorageLocation() throws Exception {
+		final FileSystem checkpointFileSystem = mock(FileSystem.class);
+		final FsCheckpointStorage storage = new FsCheckpointStorage(
+			new TestingPath("hdfs:///checkpoint/", checkpointFileSystem),
+			null,
+			new JobID(),
+			FILE_SIZE_THRESHOLD);
+
+		final FsCheckpointStorageLocation checkpointStreamFactory =
+			(FsCheckpointStorageLocation) storage.resolveCheckpointStorageLocation(1L, CheckpointStorageLocationReference.getDefault());
+		assertEquals(checkpointFileSystem, checkpointStreamFactory.getFileSystem());
+
+		final CheckpointStorageLocationReference savepointLocationReference =
+			AbstractFsCheckpointStorage.encodePathAsReference(new Path("file:///savepoint/"));
+
+		final FsCheckpointStorageLocation savepointStreamFactory =
+			(FsCheckpointStorageLocation) storage.resolveCheckpointStorageLocation(2L, savepointLocationReference);
+		final FileSystem fileSystem = savepointStreamFactory.getFileSystem();
+		assertTrue(fileSystem instanceof LocalFileSystem);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -212,4 +238,23 @@ public class FsCheckpointStorageTest extends AbstractFileCheckpointStorageTestBa
 		Path path = new Path(parent, child.getName());
 		assertEquals(path, child);
 	}
+
+	private static final class TestingPath extends Path {
+
+		private static final long serialVersionUID = 2560119808844230488L;
+
+		@SuppressWarnings("TransientFieldNotInitialized")
+		@Nonnull
+		private final transient FileSystem fileSystem;
+
+		TestingPath(String pathString, @Nonnull FileSystem fileSystem) {
+			super(pathString);
+			this.fileSystem = fileSystem;
+		}
+
+		@Override
+		public FileSystem getFileSystem() throws IOException {
+			return fileSystem;
+		}
+	}
 }