You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/02/07 10:48:16 UTC

flink git commit: [FLINK-5699] [savepoints] Check target dir when cancelling with savepoint

Repository: flink
Updated Branches:
  refs/heads/master 11ebf4842 -> b452c8bbb


[FLINK-5699] [savepoints] Check target dir when cancelling with savepoint

Problem: when cancelling a job with a savepoint and no savepoint directory
is configured, triggering the savepoint fails with an NPE. This is then
returned to the user as the root cause.

Solution: Instead of simply forwarding the argument (which is possibly
null), we check it for null and return a IllegalStateException with
a meaningful message.

This closes #3263.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b452c8bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b452c8bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b452c8bb

Branch: refs/heads/master
Commit: b452c8bbbaa9efb5de6cc66d2817b398ac9da041
Parents: 11ebf48
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Feb 3 17:28:27 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Feb 7 11:47:56 2017 +0100

----------------------------------------------------------------------
 .../flink/client/CliFrontendListCancelTest.java |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  76 +++++++------
 .../runtime/jobmanager/JobManagerTest.java      | 107 ++++++++++++++++++-
 3 files changed, 148 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b452c8bb/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 53311ef..4d3405f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -181,7 +181,7 @@ public class CliFrontendListCancelTest {
 		}
 
 		{
-			// Cancel with savepoint (no target directory)and no job ID
+			// Cancel with savepoint (no target directory) and no job ID
 			JobID jid = new JobID();
 			UUID leaderSessionID = UUID.randomUUID();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b452c8bb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d175c46..81e9fc4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -586,44 +586,54 @@ class JobManager(
           defaultSavepointDir
         }
 
-        log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory")
+        if (targetDirectory == null) {
+          log.info(s"Trying to cancel job $jobId with savepoint, but no " +
+            "savepoint directory configured.")
+
+          sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException(
+            "No savepoint directory configured. You can either specify a directory " +
+              "while cancelling via -s :targetDirectory or configure a cluster-wide " +
+              "default via key '" + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.")))
+        } else {
+          log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory")
 
-        currentJobs.get(jobId) match {
-          case Some((executionGraph, _)) =>
-            // We don't want any checkpoint between the savepoint and cancellation
-            val coord = executionGraph.getCheckpointCoordinator
-            coord.stopCheckpointScheduler()
+          currentJobs.get(jobId) match {
+            case Some((executionGraph, _)) =>
+              // We don't want any checkpoint between the savepoint and cancellation
+              val coord = executionGraph.getCheckpointCoordinator
+              coord.stopCheckpointScheduler()
 
-            // Trigger the savepoint
-            val future = coord.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
+              // Trigger the savepoint
+              val future = coord.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
 
-            val senderRef = sender()
-            future.handleAsync[Void](
-              new BiFunction[CompletedCheckpoint, Throwable, Void] {
-                override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
-                  if (success != null) {
-                    val path = success.getExternalPath()
-                    log.info(s"Savepoint stored in $path. Now cancelling $jobId.")
-                    executionGraph.cancel()
-                    senderRef ! decorateMessage(CancellationSuccess(jobId, path))
-                  } else {
-                    val msg = CancellationFailure(
-                      jobId,
-                      new Exception("Failed to trigger savepoint.", cause))
-                    senderRef ! decorateMessage(msg)
+              val senderRef = sender()
+              future.handleAsync[Void](
+                new BiFunction[CompletedCheckpoint, Throwable, Void] {
+                  override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
+                    if (success != null) {
+                      val path = success.getExternalPath()
+                      log.info(s"Savepoint stored in $path. Now cancelling $jobId.")
+                      executionGraph.cancel()
+                      senderRef ! decorateMessage(CancellationSuccess(jobId, path))
+                    } else {
+                      val msg = CancellationFailure(
+                        jobId,
+                        new Exception("Failed to trigger savepoint.", cause))
+                      senderRef ! decorateMessage(msg)
+                    }
+                    null
                   }
-                  null
-                }
-              },
-              context.dispatcher)
+                },
+                context.dispatcher)
 
-          case None =>
-            log.info(s"No job found with ID $jobId.")
-            sender ! decorateMessage(
-              CancellationFailure(
-                jobId,
-                new IllegalArgumentException(s"No job found with ID $jobId."))
-            )
+            case None =>
+              log.info(s"No job found with ID $jobId.")
+              sender ! decorateMessage(
+                CancellationFailure(
+                  jobId,
+                  new IllegalArgumentException(s"No job found with ID $jobId."))
+              )
+          }
         }
       } catch {
         case t: Throwable =>

http://git-wip-us.apache.org/repos/asf/flink/blob/b452c8bb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index b522745..b627273 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -894,9 +894,110 @@ public class JobManagerTest {
 	}
 
 	/**
-	 * Tests that we can trigger a
-	 *
-	 * @throws Exception
+	 * Tests that a meaningful exception is returned if no savepoint directory is
+	 * configured.
+	 */
+	@Test
+	public void testCancelWithSavepointNoDirectoriesConfigured() throws Exception {
+		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+		Configuration config = new Configuration();
+
+		ActorSystem actorSystem = null;
+		ActorGateway jobManager = null;
+		ActorGateway archiver = null;
+		ActorGateway taskManager = null;
+		try {
+			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+				config,
+				actorSystem,
+				actorSystem.dispatcher(),
+				actorSystem.dispatcher(),
+				Option.apply("jm"),
+				Option.apply("arch"),
+				TestingJobManager.class,
+				TestingMemoryArchivist.class);
+
+			jobManager = new AkkaActorGateway(master._1(), null);
+			archiver = new AkkaActorGateway(master._2(), null);
+
+			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+				config,
+				ResourceID.generate(),
+				actorSystem,
+				"localhost",
+				Option.apply("tm"),
+				Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
+				true,
+				TestingTaskManager.class);
+
+			taskManager = new AkkaActorGateway(taskManagerRef, null);
+
+			// Wait until connected
+			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+			Await.ready(taskManager.ask(msg, timeout), timeout);
+
+			// Create job graph
+			JobVertex sourceVertex = new JobVertex("Source");
+			sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+			sourceVertex.setParallelism(1);
+
+			JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+			JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+				Collections.singletonList(sourceVertex.getID()),
+				Collections.singletonList(sourceVertex.getID()),
+				Collections.singletonList(sourceVertex.getID()),
+				3600000,
+				3600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				true);
+
+			jobGraph.setSnapshotSettings(snapshottingSettings);
+
+			// Submit job graph
+			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Wait for all tasks to be running
+			msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Cancel with savepoint
+			msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null);
+			CancellationResponse cancelResp = (CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
+
+			if (cancelResp instanceof CancellationFailure) {
+				CancellationFailure failure = (CancellationFailure) cancelResp;
+				assertTrue(failure.cause() instanceof IllegalStateException);
+				assertTrue(failure.cause().getMessage().contains("savepoint directory"));
+			} else {
+				fail("Unexpected cancellation response from JobManager: " + cancelResp);
+			}
+		} finally {
+			if (actorSystem != null) {
+				actorSystem.shutdown();
+			}
+
+			if (archiver != null) {
+				archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (jobManager != null) {
+				jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (taskManager != null) {
+				taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
+	}
+
+	/**
+	 * Tests that we can trigger a savepoint when periodic checkpoints are disabled.
 	 */
 	@Test
 	public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {