You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/18 09:59:25 UTC

[flink] branch master updated: [FLINK-13281][tests] Improve test for aborting pending checkpoints on failover

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 59a4d2c  [FLINK-13281][tests] Improve test for aborting pending checkpoints on failover
59a4d2c is described below

commit 59a4d2c1b52f8cceb5e4e287ee3a94e129d505e7
Author: Yun Tang <my...@live.com>
AuthorDate: Thu Jul 18 17:59:02 2019 +0800

    [FLINK-13281][tests] Improve test for aborting pending checkpoints on failover
---
 ...egionStrategyNGAbortPendingCheckpointsTest.java | 47 +++++++++++++++-------
 1 file changed, 32 insertions(+), 15 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
index 899d490..acfb396 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
@@ -30,13 +30,14 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
-import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.TestLogger;
@@ -51,8 +52,8 @@ import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -76,27 +77,41 @@ public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
 		final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
 
 		final Iterator<ExecutionVertex> vertexIterator = executionGraph.getAllExecutionVertices().iterator();
-		final ExecutionVertex onlyExecutionVertex = vertexIterator.next();
+		final ExecutionVertex firstExecutionVertex = vertexIterator.next();
 
-		setTaskRunning(executionGraph, onlyExecutionVertex);
+		setTasksRunning(executionGraph, firstExecutionVertex, vertexIterator.next());
 
 		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
 		checkState(checkpointCoordinator != null);
 
 		checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(),  false);
-		final int pendingCheckpointsBeforeFailure = checkpointCoordinator.getNumberOfPendingCheckpoints();
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
 
-		failVertex(onlyExecutionVertex);
+		AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
+			jobGraph.getJobID(),
+			firstExecutionVertex.getCurrentExecutionAttempt().getAttemptId(),
+			checkpointId);
+
+		// let the first vertex acknowledge the checkpoint, and fail it afterwards
+		// the failover strategy should then cancel all pending checkpoints on restart
+		checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+		failVertex(firstExecutionVertex);
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+		manualMainThreadExecutor.triggerScheduledTasks();
 
-		assertThat(pendingCheckpointsBeforeFailure, is(equalTo(1)));
 		assertNoPendingCheckpoints(checkpointCoordinator);
 	}
 
-	private void setTaskRunning(final ExecutionGraph executionGraph, final ExecutionVertex executionVertex) {
-		executionGraph.updateState(
-			new TaskExecutionState(executionGraph.getJobID(),
-				executionVertex.getCurrentExecutionAttempt().getAttemptId(),
-				ExecutionState.RUNNING));
+	private void setTasksRunning(final ExecutionGraph executionGraph, final ExecutionVertex... executionVertices) {
+		for (ExecutionVertex executionVertex : executionVertices) {
+			executionGraph.updateState(
+				new TaskExecutionState(executionGraph.getJobID(),
+					executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+					ExecutionState.RUNNING));
+		}
 	}
 
 	private void failVertex(final ExecutionVertex onlyExecutionVertex) {
@@ -106,9 +121,11 @@ public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
 
 	private static JobGraph createStreamingJobGraph() {
 		final JobVertex v1 = new JobVertex("vertex1");
+		final JobVertex v2 = new JobVertex("vertex2");
 		v1.setInvokableClass(AbstractInvokable.class);
+		v2.setInvokableClass(AbstractInvokable.class);
 
-		final JobGraph jobGraph = new JobGraph(v1);
+		final JobGraph jobGraph = new JobGraph(v1, v2);
 		jobGraph.setScheduleMode(ScheduleMode.EAGER);
 
 		return jobGraph;
@@ -116,9 +133,9 @@ public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
 
 	private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
 		final ExecutionGraph executionGraph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph)
-			.setRestartStrategy(new InfiniteDelayRestartStrategy(10))
+			.setRestartStrategy(new FixedDelayRestartStrategy(10, 0))
 			.setFailoverStrategyFactory(AdaptedRestartPipelinedRegionStrategyNG::new)
-			.setSlotProvider(new SimpleSlotProvider(jobGraph.getJobID(), 1))
+			.setSlotProvider(new SimpleSlotProvider(jobGraph.getJobID(), 2))
 			.build();
 
 		enableCheckpointing(executionGraph);