You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/07/16 11:58:28 UTC

[GitHub] [flink] zentol commented on a change in pull request #9123: [FLINK-13281] Fix the verify logic of AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest

zentol commented on a change in pull request #9123: [FLINK-13281] Fix the verify logic of AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
URL: https://github.com/apache/flink/pull/9123#discussion_r303864146
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
 ##########
 @@ -76,49 +80,64 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
 		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
 
 		final Iterator<ExecutionVertex> vertexIterator = executionGraph.getAllExecutionVertices().iterator();
-		final ExecutionVertex onlyExecutionVertex = vertexIterator.next();
+		final ExecutionVertex firstExecutionVertex = vertexIterator.next();
 
-		setTaskRunning(executionGraph, onlyExecutionVertex);
+		setTaskRunning(executionGraph, firstExecutionVertex, vertexIterator.next());
 
 		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
 		checkState(checkpointCoordinator != null);
 
 		checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(),  false);
 		final int pendingCheckpointsBeforeFailure = checkpointCoordinator.getNumberOfPendingCheckpoints();
+		long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
 
-		failVertex(onlyExecutionVertex);
+		AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
+			firstExecutionVertex.getJobId(),
+			firstExecutionVertex.getCurrentExecutionAttempt().getAttemptId(),
+			checkpointId,
+			mock(CheckpointMetrics.class),
+			mock(TaskStateSnapshot.class));
+
+		// not let the first checkpoint to be discarded when firstExecutionVertex has been acknowledged.
+		checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
 
 Review comment:
   add assertion / state check that after this we still have a pending checkpoint

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services