You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/16 07:39:39 UTC

[GitHub] [flink] AHeise commented on a change in pull request #12101: [FLINK-17350][task] StreamTask should always fail immediately on failures in synchronous part of a checkpoint

AHeise commented on a change in pull request #12101:
URL: https://github.com/apache/flink/pull/12101#discussion_r426129028



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
##########
@@ -141,13 +141,18 @@ public void testBlockingNonInterruptibleCheckpoint() throws Exception {
 		assertNull(task.getFailureCause());
 	}
 
-	private void runTestDeclineOnCheckpointError(AbstractStateBackend backend) throws Exception{
-
+	private void runTestDeclineOnCheckpointError(AbstractStateBackend backend, boolean expectFailure) throws Exception {

Review comment:
       It feels as if the test logic is hidden in this method from the actual test case perspective.
   
   Can we instead of expectFailure pass this::runTaskExpectFailure and this::runTaskExpectCheckpointDeclined? (probably BiConsumer?)
   
   Or maybe inline this method altogether. I see the value rather limited. 
   
   Last option is to use Roman's pattern in `ChannelStateWriterImplTest` 
   ```
       @Test
   	public void testDeclineOnCheckpointErrorInAsyncPart() throws Exception {
          runWithResponder((task, checkpointResponder) -> runTaskExpectCheckpointDeclined(task, checkpointResponder));
   	}
   ```

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
##########
@@ -141,13 +141,18 @@ public void testBlockingNonInterruptibleCheckpoint() throws Exception {
 		assertNull(task.getFailureCause());
 	}
 
-	private void runTestDeclineOnCheckpointError(AbstractStateBackend backend) throws Exception{
-
+	private void runTestDeclineOnCheckpointError(AbstractStateBackend backend, boolean expectFailure) throws Exception {
 		TestDeclinedCheckpointResponder checkpointResponder = new TestDeclinedCheckpointResponder();
+		Task task = createTask(new FilterOperator(), backend, checkpointResponder);
+		if (expectFailure) {
+			runTaskExpectFailure(task);
+		}
+		else {

Review comment:
       nit: same line?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org