You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/18 10:00:23 UTC
[flink] branch release-1.9 updated: [FLINK-13281][tests] Improve
test for aborting pending checkpoints on failover
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new ca837bd [FLINK-13281][tests] Improve test for aborting pending checkpoints on failover
ca837bd is described below
commit ca837bd8d467975df9101f6ea5c19cb48cef5ac5
Author: Yun Tang <my...@live.com>
AuthorDate: Thu Jul 18 17:59:02 2019 +0800
[FLINK-13281][tests] Improve test for aborting pending checkpoints on failover
---
...egionStrategyNGAbortPendingCheckpointsTest.java | 47 +++++++++++++++-------
1 file changed, 32 insertions(+), 15 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
index 899d490..acfb396 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
@@ -30,13 +30,14 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAda
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
-import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.TestLogger;
@@ -51,8 +52,8 @@ import java.util.List;
import static org.apache.flink.util.Preconditions.checkState;
import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/**
@@ -76,27 +77,41 @@ public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
final ExecutionGraph executionGraph = createExecutionGraph(jobGraph);
final Iterator<ExecutionVertex> vertexIterator = executionGraph.getAllExecutionVertices().iterator();
- final ExecutionVertex onlyExecutionVertex = vertexIterator.next();
+ final ExecutionVertex firstExecutionVertex = vertexIterator.next();
- setTaskRunning(executionGraph, onlyExecutionVertex);
+ setTasksRunning(executionGraph, firstExecutionVertex, vertexIterator.next());
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
checkState(checkpointCoordinator != null);
checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
- final int pendingCheckpointsBeforeFailure = checkpointCoordinator.getNumberOfPendingCheckpoints();
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
- failVertex(onlyExecutionVertex);
+ AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
+ jobGraph.getJobID(),
+ firstExecutionVertex.getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId);
+
+ // let the first vertex acknowledge the checkpoint, and fail it afterwards
+ // the failover strategy should then cancel all pending checkpoints on restart
+ checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location");
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+ failVertex(firstExecutionVertex);
+ assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+ manualMainThreadExecutor.triggerScheduledTasks();
- assertThat(pendingCheckpointsBeforeFailure, is(equalTo(1)));
assertNoPendingCheckpoints(checkpointCoordinator);
}
- private void setTaskRunning(final ExecutionGraph executionGraph, final ExecutionVertex executionVertex) {
- executionGraph.updateState(
- new TaskExecutionState(executionGraph.getJobID(),
- executionVertex.getCurrentExecutionAttempt().getAttemptId(),
- ExecutionState.RUNNING));
+ private void setTasksRunning(final ExecutionGraph executionGraph, final ExecutionVertex... executionVertices) {
+ for (ExecutionVertex executionVertex : executionVertices) {
+ executionGraph.updateState(
+ new TaskExecutionState(executionGraph.getJobID(),
+ executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+ ExecutionState.RUNNING));
+ }
}
private void failVertex(final ExecutionVertex onlyExecutionVertex) {
@@ -106,9 +121,11 @@ public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
private static JobGraph createStreamingJobGraph() {
final JobVertex v1 = new JobVertex("vertex1");
+ final JobVertex v2 = new JobVertex("vertex2");
v1.setInvokableClass(AbstractInvokable.class);
+ v2.setInvokableClass(AbstractInvokable.class);
- final JobGraph jobGraph = new JobGraph(v1);
+ final JobGraph jobGraph = new JobGraph(v1, v2);
jobGraph.setScheduleMode(ScheduleMode.EAGER);
return jobGraph;
@@ -116,9 +133,9 @@ public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest
private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception {
final ExecutionGraph executionGraph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobGraph)
- .setRestartStrategy(new InfiniteDelayRestartStrategy(10))
+ .setRestartStrategy(new FixedDelayRestartStrategy(10, 0))
.setFailoverStrategyFactory(AdaptedRestartPipelinedRegionStrategyNG::new)
- .setSlotProvider(new SimpleSlotProvider(jobGraph.getJobID(), 1))
+ .setSlotProvider(new SimpleSlotProvider(jobGraph.getJobID(), 2))
.build();
enableCheckpointing(executionGraph);