You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/06/21 09:47:12 UTC
[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...
GitHub user sihuazhou opened a pull request:
https://github.com/apache/flink/pull/6194
[FLINK-9633][checkpoint] Use savepoint path's file system to create checkpoint output stream
## What is the purpose of the change
*This PR fixes Flink doesn't use the savepoint path's filesystem to create the output stream on TM side.*
## Brief change log
- *Use Savepoint path's file system to create checkpoint output stream.*
## Verifying this change
- *Added `StreamTaskTest#testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint()` to verify the changes*
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
- No
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sihuazhou/flink FLINK-9633
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6194.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6194
----
commit f006416f652485bd59124a57774fdeaafa81824b
Author: sihuazhou <su...@...>
Date: 2018-06-21T09:42:54Z
Use Savepoint path's file system to create checkpoint output stream.
----
---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6194
Hi @tillrohrmann I added a unit test `FsCheckpointStorageTest#testResolveCheckpointStorageLocation()` to guard the changes. But I still use one `mock` for creating two different file systems(one for checkpoint, another for savepoint), because without mock I can only create a LocalFileSystem in the test. Could you please have a look again?
---
[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6194#discussion_r199441371
--- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
@@ -842,6 +852,100 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
}
}
+ @Test
+ public void testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint() throws Exception {
+
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ Environment mockEnvironment = spy(new MockEnvironmentBuilder().build());
+ StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
+
+ // mock the operators
+ StreamOperator<?> statelessOperator =
+ mock(StreamOperator.class);
+
+ final OperatorID operatorID = new OperatorID();
+ when(statelessOperator.getOperatorID()).thenReturn(operatorID);
+
+ // mock the returned empty snapshot result (all state handles are null)
+ OperatorSnapshotFutures statelessOperatorSnapshotResult = new OperatorSnapshotFutures();
+ when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
+ .thenReturn(statelessOperatorSnapshotResult);
+
+ // set up the task
+ StreamOperator<?>[] streamOperators = {statelessOperator};
+ OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
+ when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+ FileSystem checkpointFileSystem = mock(FileSystem.class);
+ FileSystem savepointFileSystem = mock(FileSystem.class);
+
+ FileSystem.setFsFactories(new HashMap<String, FileSystemFactory>() {{
+ this.put("file", new FileSystemFactory() {
+
+ @Override
+ public String getScheme() {
+ return "file";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ return savepointFileSystem;
+ }
+ });
+ this.put("hdfs", new FileSystemFactory() {
+ @Override
+ public String getScheme() {
+ return "hdfs";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ return checkpointFileSystem;
+ }
+ });
+ }});
+
+ CheckpointStorage checkpointStorage = spy(new FsCheckpointStorage(new Path("hdfs://test1/"), new Path("file:///test2/"), new JobID(), 1024));
+
+ CheckpointStorageLocationReference locationReference = AbstractFsCheckpointStorage.encodePathAsReference(new Path("file:///test2/"));
+
+ when(checkpointStorage.resolveCheckpointStorageLocation(checkpointId, locationReference)).then(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ // valid
+ FsCheckpointStorageLocation checkpointStorageLocation = (FsCheckpointStorageLocation) invocationOnMock.callRealMethod();
+ assertEquals(savepointFileSystem, checkpointStorageLocation.getFileSystem());
+ return checkpointStorageLocation;
+ }
+ });
+
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+ Whitebox.setInternalState(streamTask, "lock", new Object());
+ Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
+ Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
+ Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "checkpointStorage", checkpointStorage);
+ Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newCachedThreadPool());
+
+ CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+
+ streamTask.triggerCheckpoint(
+ checkpointMetaData,
+ new CheckpointOptions(CheckpointType.SAVEPOINT, locationReference));
+ }
--- End diff --
This tests includes a lot of mocking, spying and whitebox testing. Usually these things are really hard to maintain. I would, therefore, suggest to create a unit test for the `FsCheckpointStorage#resolveCheckpointStorageLocation` method instead.
---
[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6194#discussion_r199425645
--- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
@@ -416,6 +417,12 @@ public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOExcept
}
}
+ @VisibleForTesting
+ public static void setFsFactories(HashMap<String, FileSystemFactory> fsFactories) {
--- End diff --
Let's pass in a `Map` instead of a `HashMap`.
---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6194
@tillrohrmann Thanks for your review and suggestion, will change it tonight~
---
[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6194#discussion_r199441573
--- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
@@ -416,6 +417,12 @@ public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOExcept
}
}
+ @VisibleForTesting
+ public static void setFsFactories(HashMap<String, FileSystemFactory> fsFactories) {
--- End diff --
If we change the test into a unit test for the `FsCheckpointStorage`, then we don't need this method here.
---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6194
@yanghua Thanks for the review, I rebased the PR.
---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:
https://github.com/apache/flink/pull/6194
+1, there is a conflicting file~ cc @sihuazhou
---
[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:
https://github.com/apache/flink/pull/6194
CC @zentol
---
[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6194
---