You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sihuazhou <gi...@git.apache.org> on 2018/06/21 09:47:12 UTC

[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...

GitHub user sihuazhou opened a pull request:

    https://github.com/apache/flink/pull/6194

    [FLINK-9633][checkpoint] Use savepoint path's file system to create checkpoint output stream

    ## What is the purpose of the change
    
    *This PR fixes Flink doesn't use the savepoint path's filesystem to create the output stream on TM side.*
    
    ## Brief change log
    
      - *Use Savepoint path's file system to create checkpoint output stream.*
    
    
    ## Verifying this change
    
      - *Added `StreamTaskTest#testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint()` to verify the changes*
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - No


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sihuazhou/flink FLINK-9633

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6194.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6194
    
----
commit f006416f652485bd59124a57774fdeaafa81824b
Author: sihuazhou <su...@...>
Date:   2018-06-21T09:42:54Z

    Use Savepoint path's file system to create checkpoint output stream.

----


---

[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6194
  
    Hi @tillrohrmann I added a unit test `FsCheckpointStorageTest#testResolveCheckpointStorageLocation()` to guard the changes. But I still use one `mock` for creating two different file systems(one for checkpoint, another for savepoint), because without mock I can only create a LocalFileSystem in the test. Could you please have a look again?


---

[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6194#discussion_r199441371
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---
    @@ -842,6 +852,100 @@ public void testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
     		}
     	}
     
    +	@Test
    +	public void testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint() throws Exception {
    +
    +		final long checkpointId = 42L;
    +		final long timestamp = 1L;
    +
    +		Environment mockEnvironment = spy(new MockEnvironmentBuilder().build());
    +		StreamTask<?, ?> streamTask = new EmptyStreamTask(mockEnvironment);
    +
    +		// mock the operators
    +		StreamOperator<?> statelessOperator =
    +			mock(StreamOperator.class);
    +
    +		final OperatorID operatorID = new OperatorID();
    +		when(statelessOperator.getOperatorID()).thenReturn(operatorID);
    +
    +		// mock the returned empty snapshot result (all state handles are null)
    +		OperatorSnapshotFutures statelessOperatorSnapshotResult = new OperatorSnapshotFutures();
    +		when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
    +			.thenReturn(statelessOperatorSnapshotResult);
    +
    +		// set up the task
    +		StreamOperator<?>[] streamOperators = {statelessOperator};
    +		OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain = mock(OperatorChain.class);
    +		when(operatorChain.getAllOperators()).thenReturn(streamOperators);
    +
    +		FileSystem checkpointFileSystem = mock(FileSystem.class);
    +		FileSystem savepointFileSystem = mock(FileSystem.class);
    +
    +		FileSystem.setFsFactories(new HashMap<String, FileSystemFactory>() {{
    +			this.put("file", new FileSystemFactory() {
    +
    +				@Override
    +				public String getScheme() {
    +					return "file";
    +				}
    +
    +				@Override
    +				public void configure(Configuration config) {
    +
    +				}
    +
    +				@Override
    +				public FileSystem create(URI fsUri) throws IOException {
    +					return savepointFileSystem;
    +				}
    +			});
    +			this.put("hdfs", new FileSystemFactory() {
    +				@Override
    +				public String getScheme() {
    +					return "hdfs";
    +				}
    +
    +				@Override
    +				public void configure(Configuration config) {
    +
    +				}
    +
    +				@Override
    +				public FileSystem create(URI fsUri) throws IOException {
    +					return checkpointFileSystem;
    +				}
    +			});
    +		}});
    +
    +		CheckpointStorage checkpointStorage = spy(new FsCheckpointStorage(new Path("hdfs://test1/"), new Path("file:///test2/"), new JobID(), 1024));
    +
    +		CheckpointStorageLocationReference locationReference = AbstractFsCheckpointStorage.encodePathAsReference(new Path("file:///test2/"));
    +
    +		when(checkpointStorage.resolveCheckpointStorageLocation(checkpointId, locationReference)).then(new Answer<Object>() {
    +			@Override
    +			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
    +				// valid
    +				FsCheckpointStorageLocation checkpointStorageLocation = (FsCheckpointStorageLocation) invocationOnMock.callRealMethod();
    +				assertEquals(savepointFileSystem, checkpointStorageLocation.getFileSystem());
    +				return checkpointStorageLocation;
    +			}
    +		});
    +
    +		Whitebox.setInternalState(streamTask, "isRunning", true);
    +		Whitebox.setInternalState(streamTask, "lock", new Object());
    +		Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
    +		Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
    +		Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
    +		Whitebox.setInternalState(streamTask, "checkpointStorage", checkpointStorage);
    +		Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newCachedThreadPool());
    +
    +		CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
    +
    +		streamTask.triggerCheckpoint(
    +			checkpointMetaData,
    +			new CheckpointOptions(CheckpointType.SAVEPOINT, locationReference));
    +	}
    --- End diff --
    
    This tests includes a lot of mocking, spying and whitebox testing. Usually these things are really hard to maintain. I would, therefore, suggest to create a unit test for the `FsCheckpointStorage#resolveCheckpointStorageLocation`  method instead. 


---

[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6194#discussion_r199425645
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
    @@ -416,6 +417,12 @@ public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOExcept
     		}
     	}
     
    +	@VisibleForTesting
    +	public static void setFsFactories(HashMap<String, FileSystemFactory> fsFactories) {
    --- End diff --
    
    Let's pass in a `Map` instead of a `HashMap`.


---

[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6194
  
    @tillrohrmann Thanks for your review and suggestion, will change it tonight~


---

[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6194#discussion_r199441573
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java ---
    @@ -416,6 +417,12 @@ public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOExcept
     		}
     	}
     
    +	@VisibleForTesting
    +	public static void setFsFactories(HashMap<String, FileSystemFactory> fsFactories) {
    --- End diff --
    
    If we change the test into a unit test for the `FsCheckpointStorage`, then we don't need this method here.


---

[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6194
  
    @yanghua Thanks for the review, I rebased the PR.


---

[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

Posted by yanghua <gi...@git.apache.org>.
Github user yanghua commented on the issue:

    https://github.com/apache/flink/pull/6194
  
    +1, there is a conflicting file~ cc @sihuazhou


---

[GitHub] flink issue #6194: [FLINK-9633][checkpoint] Use savepoint path's file system...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on the issue:

    https://github.com/apache/flink/pull/6194
  
    CC @zentol 


---

[GitHub] flink pull request #6194: [FLINK-9633][checkpoint] Use savepoint path's file...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6194


---