You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/02/07 10:48:16 UTC
flink git commit: [FLINK-5699] [savepoints] Check target dir when
cancelling with savepoint
Repository: flink
Updated Branches:
refs/heads/master 11ebf4842 -> b452c8bbb
[FLINK-5699] [savepoints] Check target dir when cancelling with savepoint
Problem: when cancelling a job with a savepoint and no savepoint directory
is configured, triggering the savepoint fails with an NPE. This is then
returned to the user as the root cause.
Solution: Instead of simply forwarding the argument (which is possibly
null), we check it for null and return a IllegalStateException with
a meaningful message.
This closes #3263.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b452c8bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b452c8bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b452c8bb
Branch: refs/heads/master
Commit: b452c8bbbaa9efb5de6cc66d2817b398ac9da041
Parents: 11ebf48
Author: Ufuk Celebi <uc...@apache.org>
Authored: Fri Feb 3 17:28:27 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Tue Feb 7 11:47:56 2017 +0100
----------------------------------------------------------------------
.../flink/client/CliFrontendListCancelTest.java | 2 +-
.../flink/runtime/jobmanager/JobManager.scala | 76 +++++++------
.../runtime/jobmanager/JobManagerTest.java | 107 ++++++++++++++++++-
3 files changed, 148 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b452c8bb/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 53311ef..4d3405f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -181,7 +181,7 @@ public class CliFrontendListCancelTest {
}
{
- // Cancel with savepoint (no target directory)and no job ID
+ // Cancel with savepoint (no target directory) and no job ID
JobID jid = new JobID();
UUID leaderSessionID = UUID.randomUUID();
http://git-wip-us.apache.org/repos/asf/flink/blob/b452c8bb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d175c46..81e9fc4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -586,44 +586,54 @@ class JobManager(
defaultSavepointDir
}
- log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory")
+ if (targetDirectory == null) {
+ log.info(s"Trying to cancel job $jobId with savepoint, but no " +
+ "savepoint directory configured.")
+
+ sender ! decorateMessage(CancellationFailure(jobId, new IllegalStateException(
+ "No savepoint directory configured. You can either specify a directory " +
+ "while cancelling via -s :targetDirectory or configure a cluster-wide " +
+ "default via key '" + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.")))
+ } else {
+ log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory")
- currentJobs.get(jobId) match {
- case Some((executionGraph, _)) =>
- // We don't want any checkpoint between the savepoint and cancellation
- val coord = executionGraph.getCheckpointCoordinator
- coord.stopCheckpointScheduler()
+ currentJobs.get(jobId) match {
+ case Some((executionGraph, _)) =>
+ // We don't want any checkpoint between the savepoint and cancellation
+ val coord = executionGraph.getCheckpointCoordinator
+ coord.stopCheckpointScheduler()
- // Trigger the savepoint
- val future = coord.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
+ // Trigger the savepoint
+ val future = coord.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
- val senderRef = sender()
- future.handleAsync[Void](
- new BiFunction[CompletedCheckpoint, Throwable, Void] {
- override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
- if (success != null) {
- val path = success.getExternalPath()
- log.info(s"Savepoint stored in $path. Now cancelling $jobId.")
- executionGraph.cancel()
- senderRef ! decorateMessage(CancellationSuccess(jobId, path))
- } else {
- val msg = CancellationFailure(
- jobId,
- new Exception("Failed to trigger savepoint.", cause))
- senderRef ! decorateMessage(msg)
+ val senderRef = sender()
+ future.handleAsync[Void](
+ new BiFunction[CompletedCheckpoint, Throwable, Void] {
+ override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
+ if (success != null) {
+ val path = success.getExternalPath()
+ log.info(s"Savepoint stored in $path. Now cancelling $jobId.")
+ executionGraph.cancel()
+ senderRef ! decorateMessage(CancellationSuccess(jobId, path))
+ } else {
+ val msg = CancellationFailure(
+ jobId,
+ new Exception("Failed to trigger savepoint.", cause))
+ senderRef ! decorateMessage(msg)
+ }
+ null
}
- null
- }
- },
- context.dispatcher)
+ },
+ context.dispatcher)
- case None =>
- log.info(s"No job found with ID $jobId.")
- sender ! decorateMessage(
- CancellationFailure(
- jobId,
- new IllegalArgumentException(s"No job found with ID $jobId."))
- )
+ case None =>
+ log.info(s"No job found with ID $jobId.")
+ sender ! decorateMessage(
+ CancellationFailure(
+ jobId,
+ new IllegalArgumentException(s"No job found with ID $jobId."))
+ )
+ }
}
} catch {
case t: Throwable =>
http://git-wip-us.apache.org/repos/asf/flink/blob/b452c8bb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index b522745..b627273 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -894,9 +894,110 @@ public class JobManagerTest {
}
/**
- * Tests that we can trigger a
- *
- * @throws Exception
+ * Tests that a meaningful exception is returned if no savepoint directory is
+ * configured.
+ */
+ @Test
+ public void testCancelWithSavepointNoDirectoriesConfigured() throws Exception {
+ FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+ Configuration config = new Configuration();
+
+ ActorSystem actorSystem = null;
+ ActorGateway jobManager = null;
+ ActorGateway archiver = null;
+ ActorGateway taskManager = null;
+ try {
+ actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+ Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+ config,
+ actorSystem,
+ actorSystem.dispatcher(),
+ actorSystem.dispatcher(),
+ Option.apply("jm"),
+ Option.apply("arch"),
+ TestingJobManager.class,
+ TestingMemoryArchivist.class);
+
+ jobManager = new AkkaActorGateway(master._1(), null);
+ archiver = new AkkaActorGateway(master._2(), null);
+
+ ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+ config,
+ ResourceID.generate(),
+ actorSystem,
+ "localhost",
+ Option.apply("tm"),
+ Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
+ true,
+ TestingTaskManager.class);
+
+ taskManager = new AkkaActorGateway(taskManagerRef, null);
+
+ // Wait until connected
+ Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+ Await.ready(taskManager.ask(msg, timeout), timeout);
+
+ // Create job graph
+ JobVertex sourceVertex = new JobVertex("Source");
+ sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+ sourceVertex.setParallelism(1);
+
+ JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+ JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+ Collections.singletonList(sourceVertex.getID()),
+ Collections.singletonList(sourceVertex.getID()),
+ Collections.singletonList(sourceVertex.getID()),
+ 3600000,
+ 3600000,
+ 0,
+ Integer.MAX_VALUE,
+ ExternalizedCheckpointSettings.none(),
+ true);
+
+ jobGraph.setSnapshotSettings(snapshottingSettings);
+
+ // Submit job graph
+ msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+ Await.result(jobManager.ask(msg, timeout), timeout);
+
+ // Wait for all tasks to be running
+ msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+ Await.result(jobManager.ask(msg, timeout), timeout);
+
+ // Cancel with savepoint
+ msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null);
+ CancellationResponse cancelResp = (CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
+
+ if (cancelResp instanceof CancellationFailure) {
+ CancellationFailure failure = (CancellationFailure) cancelResp;
+ assertTrue(failure.cause() instanceof IllegalStateException);
+ assertTrue(failure.cause().getMessage().contains("savepoint directory"));
+ } else {
+ fail("Unexpected cancellation response from JobManager: " + cancelResp);
+ }
+ } finally {
+ if (actorSystem != null) {
+ actorSystem.shutdown();
+ }
+
+ if (archiver != null) {
+ archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ if (jobManager != null) {
+ jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ if (taskManager != null) {
+ taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ }
+ }
+
+ /**
+ * Tests that we can trigger a savepoint when periodic checkpoints are disabled.
*/
@Test
public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {