You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/10/23 12:34:26 UTC

[2/2] flink git commit: [FLINK-7067] [jobmanager] Resume periodic checkpoints after failed cancel-job-with-savepoint

[FLINK-7067] [jobmanager] Resume periodic checkpoints after failed cancel-job-with-savepoint

Problem: If a cancel-job-with-savepoint request fails, this has an
unintended side effect on the respective job if it has periodic
checkpoints enabled. The periodic checkpoint scheduler is stopped
before triggering the savepoint, but not restarted if a savepoint
fails and the job is not cancelled.

This commit makes sure that the periodic checkpoint scheduler is
restarted iff periodic checkpoints were enabled before.

This closes #4254.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e49bc428
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e49bc428
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e49bc428

Branch: refs/heads/master
Commit: e49bc428c396ffec501f13ba5fb9337fb4191818
Parents: 3b9d518
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Jul 4 17:01:32 2017 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Oct 23 14:33:50 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   9 +
 .../flink/runtime/jobmanager/JobManager.scala   |  11 +
 .../runtime/jobmanager/JobManagerTest.java      | 199 ++++++++++++++++---
 3 files changed, 186 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e49bc428/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c98d3aa..f932354 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1188,6 +1188,15 @@ public class CheckpointCoordinator {
 		return checkpointTimeout;
 	}
 
+	/**
+	 * Returns whether periodic checkpointing has been configured.
+	 *
+	 * @return <code>true</code> if periodic checkpoints have been configured.
+	 */
+	public boolean isPeriodicCheckpointingConfigured() {
+		return baseInterval != Long.MAX_VALUE;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Periodic scheduling of checkpoints
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e49bc428/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b7d5dda..8c3cf8e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -623,6 +623,17 @@ class JobManager(
                       executionGraph.cancel()
                       senderRef ! decorateMessage(CancellationSuccess(jobId, path))
                     } else {
+                      // Restart checkpoint scheduler iff periodic checkpoints are configured.
+                      // Otherwise we have unintended side effects on the job.
+                      if (coord.isPeriodicCheckpointingConfigured) {
+                        try {
+                          coord.startCheckpointScheduler()
+                        } catch {
+                          case ignored: IllegalStateException =>
+                            // Concurrent shut down of the coordinator
+                        }
+                      }
+
                       val msg = CancellationFailure(
                         jobId,
                         new Exception("Failed to trigger savepoint.", cause))

http://git-wip-us.apache.org/repos/asf/flink/blob/e49bc428/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index a6b0581..68da362 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -18,13 +18,6 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Status;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import com.typesafe.config.Config;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
@@ -35,6 +28,11 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
@@ -58,11 +56,12 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
@@ -102,6 +101,14 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Status;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.typesafe.config.Config;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -109,6 +116,15 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
@@ -118,16 +134,12 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
 
-import java.io.File;
-import java.net.InetAddress;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
+import static org.apache.flink.runtime.messages.JobManagerMessages.JobResultFailure;
 import static org.apache.flink.runtime.messages.JobManagerMessages.JobResultSuccess;
 import static org.apache.flink.runtime.messages.JobManagerMessages.JobSubmitSuccess;
+import static org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
 import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs;
 import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered;
@@ -437,7 +449,7 @@ public class JobManagerTest extends TestLogger {
 								jobGraph,
 								ListeningBehaviour.EXECUTION_RESULT),
 							testActorGateway);
-						expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+						expectMsgClass(JobSubmitSuccess.class);
 
 						jobManagerGateway.tell(
 							new WaitForAllVerticesToBeRunningOrFinished(jid),
@@ -467,7 +479,7 @@ public class JobManagerTest extends TestLogger {
 						vertex.resetForNewExecution(System.currentTimeMillis(), 1L);
 
 						// Producer finished, request state
-						Object request = new JobManagerMessages.RequestPartitionProducerState(jid, rid, partitionId);
+						Object request = new RequestPartitionProducerState(jid, rid, partitionId);
 
 						Future<?> producerStateFuture = jobManagerGateway.ask(request, getRemainingTime());
 
@@ -881,7 +893,7 @@ public class JobManagerTest extends TestLogger {
 			jobGraph.setSnapshotSettings(snapshottingSettings);
 
 			// Submit job graph
-			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+			msg = new SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
 			Await.result(jobManager.ask(msg, timeout), timeout);
 
 			// Wait for all tasks to be running
@@ -896,7 +908,7 @@ public class JobManagerTest extends TestLogger {
 			String savepointPath = null;
 
 			for (int i = 0; i < 10; i++) {
-				msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null);
+				msg = new CancelJobWithSavepoint(jobGraph.getJobID(), null);
 				CancellationResponse cancelResp = (CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
 
 				if (cancelResp instanceof CancellationFailure) {
@@ -945,6 +957,79 @@ public class JobManagerTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that a failed savepoint does not cancel the job and new checkpoints are triggered
+	 * after the failed cancel-with-savepoint.
+	 */
+	@Test
+	public void testCancelJobWithSavepointFailurePeriodicCheckpoints() throws Exception {
+		File savepointTarget = tmpFolder.newFolder();
+
+		// A source that declines savepoints, simulating the behaviour of a
+		// failed savepoint.
+		JobVertex sourceVertex = new JobVertex("Source");
+		sourceVertex.setInvokableClass(FailOnSavepointSourceTask.class);
+		sourceVertex.setParallelism(1);
+		JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+		CheckpointCoordinatorConfiguration coordConfig = new CheckpointCoordinatorConfiguration(
+			50,
+			3600000,
+			0,
+			Integer.MAX_VALUE,
+			ExternalizedCheckpointSettings.none(),
+			true);
+
+		JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
+			Collections.singletonList(sourceVertex.getID()),
+			Collections.singletonList(sourceVertex.getID()),
+			Collections.singletonList(sourceVertex.getID()),
+			coordConfig,
+			null);
+
+		jobGraph.setSnapshotSettings(snapshottingSettings);
+
+		final TestingCluster testingCluster = new TestingCluster(
+			new Configuration(),
+			highAvailabilityServices,
+			true,
+			false);
+
+		try {
+			testingCluster.start(true);
+
+			FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
+			ActorGateway jobManager = testingCluster.getLeaderGateway(askTimeout);
+
+			testingCluster.submitJobDetached(jobGraph);
+
+			// Wait for the source to be running otherwise the savepoint
+			// barrier will not reach the task.
+			Future<Object> allTasksAlive = jobManager.ask(
+				new WaitForAllVerticesToBeRunning(jobGraph.getJobID()),
+				askTimeout);
+			Await.ready(allTasksAlive, askTimeout);
+
+			// Cancel with savepoint. The expected outcome is that cancellation
+			// fails due to a failed savepoint. After this, periodic checkpoints
+			// should resume.
+			Future<Object> cancellationFuture = jobManager.ask(
+				new CancelJobWithSavepoint(jobGraph.getJobID(), savepointTarget.getAbsolutePath()),
+				askTimeout);
+			Object cancellationResponse = Await.result(cancellationFuture, askTimeout);
+
+			if (cancellationResponse instanceof CancellationFailure) {
+				if (!FailOnSavepointSourceTask.CHECKPOINT_AFTER_SAVEPOINT_LATCH.await(30, TimeUnit.SECONDS)) {
+					fail("No checkpoint was triggered after failed savepoint within expected duration");
+				}
+			} else {
+				fail("Unexpected cancellation response from JobManager: " + cancellationResponse);
+			}
+		} finally {
+			testingCluster.shutdown();
+		}
+	}
+
+	/**
 	 * Tests that a meaningful exception is returned if no savepoint directory is
 	 * configured.
 	 */
@@ -1018,7 +1103,7 @@ public class JobManagerTest extends TestLogger {
 			jobGraph.setSnapshotSettings(snapshottingSettings);
 
 			// Submit job graph
-			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+			msg = new SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
 			Await.result(jobManager.ask(msg, timeout), timeout);
 
 			// Wait for all tasks to be running
@@ -1026,7 +1111,7 @@ public class JobManagerTest extends TestLogger {
 			Await.result(jobManager.ask(msg, timeout), timeout);
 
 			// Cancel with savepoint
-			msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null);
+			msg = new CancelJobWithSavepoint(jobGraph.getJobID(), null);
 			CancellationResponse cancelResp = (CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
 
 			if (cancelResp instanceof CancellationFailure) {
@@ -1131,7 +1216,7 @@ public class JobManagerTest extends TestLogger {
 			jobGraph.setSnapshotSettings(snapshottingSettings);
 
 			// Submit job graph
-			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+			msg = new SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
 			Await.result(jobManager.ask(msg, timeout), timeout);
 
 			// Wait for all tasks to be running
@@ -1245,7 +1330,7 @@ public class JobManagerTest extends TestLogger {
 			jobGraph.setSnapshotSettings(snapshottingSettings);
 
 			// Submit job graph
-			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+			msg = new SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
 			Await.result(jobManager.ask(msg, timeout), timeout);
 
 			// Wait for all tasks to be running
@@ -1295,12 +1380,12 @@ public class JobManagerTest extends TestLogger {
 			SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath(savepointPath, false);
 			newJobGraph.setSavepointRestoreSettings(restoreSettings);
 
-			msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
+			msg = new SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
 			response = Await.result(jobManager.ask(msg, timeout), timeout);
 
-			assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.JobResultFailure);
+			assertTrue("Unexpected response: " + response, response instanceof JobResultFailure);
 
-			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) response;
+			JobResultFailure failure = (JobResultFailure) response;
 			Throwable cause = failure.cause().deserializeError(ClassLoader.getSystemClassLoader());
 
 			assertTrue(cause instanceof IllegalStateException);
@@ -1314,10 +1399,10 @@ public class JobManagerTest extends TestLogger {
 			restoreSettings = SavepointRestoreSettings.forPath(savepointPath, true);
 			newJobGraph.setSavepointRestoreSettings(restoreSettings);
 
-			msg = new JobManagerMessages.SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
+			msg = new SubmitJob(newJobGraph, ListeningBehaviour.DETACHED);
 			response = Await.result(jobManager.ask(msg, timeout), timeout);
 
-			assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.JobSubmitSuccess);
+			assertTrue("Unexpected response: " + response, response instanceof JobSubmitSuccess);
 		} finally {
 			if (actorSystem != null) {
 				actorSystem.shutdown();
@@ -1354,7 +1439,6 @@ public class JobManagerTest extends TestLogger {
 		final Configuration configuration = new Configuration();
 		configuration.setLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL, reconnectionInterval);
 
-
 		final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(configuration);
 
 		try {
@@ -1374,7 +1458,7 @@ public class JobManagerTest extends TestLogger {
 
 			jmGateway.tell(new RegisterResourceManager(probe.ref()), rmGateway);
 
-			JobManagerMessages.LeaderSessionMessage leaderSessionMessage = probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
+			LeaderSessionMessage leaderSessionMessage = probe.expectMsgClass(LeaderSessionMessage.class);
 
 			assertEquals(jmGateway.leaderSessionID(), leaderSessionMessage.leaderSessionID());
 			assertTrue(leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful);
@@ -1385,7 +1469,7 @@ public class JobManagerTest extends TestLogger {
 					mock(TaskManagerLocation.class),
 					new HardwareDescription(1, 1L, 1L, 1L),
 					1));
-			leaderSessionMessage = probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
+			leaderSessionMessage = probe.expectMsgClass(LeaderSessionMessage.class);
 
 			assertTrue(leaderSessionMessage.message() instanceof NotifyResourceStarted);
 
@@ -1397,7 +1481,7 @@ public class JobManagerTest extends TestLogger {
 
 			while (reconnectionDeadline.hasTimeLeft()) {
 				try {
-					leaderSessionMessage = probe.expectMsgClass(reconnectionDeadline.timeLeft(), JobManagerMessages.LeaderSessionMessage.class);
+					leaderSessionMessage = probe.expectMsgClass(reconnectionDeadline.timeLeft(), LeaderSessionMessage.class);
 				} catch (AssertionError ignored) {
 					// expected timeout after the reconnectionDeadline has been exceeded
 					continue;
@@ -1425,4 +1509,53 @@ public class JobManagerTest extends TestLogger {
 			actorSystem.awaitTermination();
 		}
 	}
+
+	/**
+	 * A blocking stateful source task that declines savepoints.
+	 */
+	public static class FailOnSavepointSourceTask extends AbstractInvokable implements StatefulTask {
+
+		private static final CountDownLatch CHECKPOINT_AFTER_SAVEPOINT_LATCH = new CountDownLatch(1);
+
+		private boolean receivedSavepoint;
+
+		@Override
+		public void invoke() throws Exception {
+			new CountDownLatch(1).await();
+		}
+
+		@Override
+		public void setInitialState(TaskStateSnapshot taskStateHandles) throws Exception {
+		}
+
+		@Override
+		public boolean triggerCheckpoint(
+			CheckpointMetaData checkpointMetaData,
+			CheckpointOptions checkpointOptions) throws Exception {
+			if (checkpointOptions.getCheckpointType() == CheckpointType.SAVEPOINT) {
+				receivedSavepoint = true;
+				return false;
+			} else if (receivedSavepoint) {
+				CHECKPOINT_AFTER_SAVEPOINT_LATCH.countDown();
+				return true;
+			}
+			return true;
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(
+			CheckpointMetaData checkpointMetaData,
+			CheckpointOptions checkpointOptions,
+			CheckpointMetrics checkpointMetrics) throws Exception {
+			throw new UnsupportedOperationException("This is meant to be used as a source");
+		}
+
+		@Override
+		public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		}
+	}
 }