You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/17 16:38:05 UTC

[GitHub] tillrohrmann commented on a change in pull request #7514: [FLINK-11349][tests] Port CoordinatorShutdownTest to new code base

tillrohrmann commented on a change in pull request #7514: [FLINK-11349][tests] Port CoordinatorShutdownTest to new code base
URL: https://github.com/apache/flink/pull/7514#discussion_r248747514
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 ##########
 @@ -20,59 +20,118 @@
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.DummyJobInformation;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.TestingSlotProvider;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
 
-public class ExecutionGraphCheckpointCoordinatorTest {
+/**
+ * Tests for the interaction between the {@link ExecutionGraph} and the {@link CheckpointCoordinator}.
+ */
+public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger {
 
 	/**
-	 * Tests that a shut down checkpoint coordinator calls shutdown on
-	 * the store and counter.
+	 * Tests that the checkpoint coordinator is shut down if the execution graph
+	 * is failed.
 	 */
 	@Test
-	public void testShutdownCheckpointCoordinator() throws Exception {
-		CheckpointIDCounter counter = mock(CheckpointIDCounter.class);
-		CompletedCheckpointStore store = mock(CompletedCheckpointStore.class);
+	public void testShutdownCheckpointCoordinatorOnFailure() throws Exception {
+		final CompletableFuture<JobStatus> counterShutdownFuture = new CompletableFuture<>();
+		CheckpointIDCounter counter = new TestingCheckpointIDCounter(counterShutdownFuture);
+
+		final CompletableFuture<JobStatus> storeShutdownFuture = new CompletableFuture<>();
+		CompletedCheckpointStore store = new TestingCompletedCheckpointStore(storeShutdownFuture);
 
 		ExecutionGraph graph = createExecutionGraphAndEnableCheckpointing(counter, store);
+		final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
+
+		assertThat(checkpointCoordinator, Matchers.notNullValue());
+		assertThat(checkpointCoordinator.isShutdown(), is(false));
+
 		graph.failGlobal(new Exception("Test Exception"));
 
-		verify(counter, times(1)).shutdown(JobStatus.FAILED);
-		verify(store, times(1)).shutdown(eq(JobStatus.FAILED));
+		assertThat(checkpointCoordinator.isShutdown(), is(true));
+		assertThat(counterShutdownFuture.get(), is(JobStatus.FAILED));
+		assertThat(storeShutdownFuture.get(), is(JobStatus.FAILED));
 	}
 
 	/**
-	 * Tests that a suspended checkpoint coordinator calls suspend on
-	 * the store and counter.
+	 * Tests that the checkpoint coordinator is shut down if the execution graph
+	 * is suspended.
 	 */
 	@Test
-	public void testSuspendCheckpointCoordinator() throws Exception {
-		CheckpointIDCounter counter = mock(CheckpointIDCounter.class);
-		CompletedCheckpointStore store = mock(CompletedCheckpointStore.class);
+	public void testShutdownCheckpointCoordinatorOnSuspend() throws Exception {
+		final CompletableFuture<JobStatus> counterShutdownFuture = new CompletableFuture<>();
+		CheckpointIDCounter counter = new TestingCheckpointIDCounter(counterShutdownFuture);
+
+		final CompletableFuture<JobStatus> storeShutdownFuture = new CompletableFuture<>();
+		CompletedCheckpointStore store = new TestingCompletedCheckpointStore(storeShutdownFuture);
 
 		ExecutionGraph graph = createExecutionGraphAndEnableCheckpointing(counter, store);
+		final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
+
+		assertThat(checkpointCoordinator, Matchers.notNullValue());
+		assertThat(checkpointCoordinator.isShutdown(), is(false));
+
 		graph.suspend(new Exception("Test Exception"));
 
-		// No shutdown
-		verify(counter, times(1)).shutdown(eq(JobStatus.SUSPENDED));
-		verify(store, times(1)).shutdown(eq(JobStatus.SUSPENDED));
+		assertThat(checkpointCoordinator.isShutdown(), is(true));
+		assertThat(counterShutdownFuture.get(), is(JobStatus.SUSPENDED));
+		assertThat(storeShutdownFuture.get(), is(JobStatus.SUSPENDED));
+	}
+
+	/**
+	 * Tests that the checkpoint coordinator is shut down if the execution graph
+	 * is finished.
+	 */
+	@Test
+	public void testShutdownCheckpointCoordinatorOnFinished() throws Exception {
+		final CompletableFuture<JobStatus> counterShutdownFuture = new CompletableFuture<>();
+		CheckpointIDCounter counter = new TestingCheckpointIDCounter(counterShutdownFuture);
+
+		final CompletableFuture<JobStatus> storeShutdownFuture = new CompletableFuture<>();
+		CompletedCheckpointStore store = new TestingCompletedCheckpointStore(storeShutdownFuture);
+
+		ExecutionGraph graph = createExecutionGraphAndEnableCheckpointing(counter, store);
+		final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
+
+		assertThat(checkpointCoordinator, Matchers.notNullValue());
+		assertThat(checkpointCoordinator.isShutdown(), is(false));
+
+		graph.scheduleForExecution();
 
 Review comment:
   https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html but this might actually be partially out dated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services