You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by uce <gi...@git.apache.org> on 2017/07/04 15:16:43 UTC

[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/4254

    [FLINK-7067] [jobmanager] Fix side effects after failed cancel-job-with-savepoint

    If a cancel-job-with-savepoint request fails, this has an unintended side effect on the respective job if it has periodic checkpoints enabled. The periodic checkpoint scheduler is stopped before triggering the savepoint, but not restarted if a savepoint fails and the job is not cancelled.
    
    This fix makes sure that the periodic checkpoint scheduler is restarted iff periodic checkpoints were enabled before.
    
    I have the test in a separate commit, because it uses Reflection to update a private field with a spied upon instance of the CheckpointCoordinator in order to test the expected behaviour. This is super fragile and ugly, but the alternatives require a large refactoring (use factories that can be set during tests) or don't test this corner case behaviour. The separate commit makes it easier to remove/revert it at a future point in time.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 7067-restart_checkpoint_scheduler

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4254.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4254
    
----
commit 7294de0ef77a346b7b38d4b3fcdc421f7fd6855b
Author: Ufuk Celebi <uc...@apache.org>
Date:   2017-07-04T14:39:02Z

    [tests] Reduce visibility of helper class methods
    
    There is no need to make the helper methods public. No other class
    should even use this inner test helper invokable.

commit ce924bc146d3cf97e0c5ddcc1ba16610b2fc8d49
Author: Ufuk Celebi <uc...@apache.org>
Date:   2017-07-04T14:53:54Z

    [FLINK-7067] [jobmanager] Add test for cancel-job-with-savepoint side effects
    
    I have this test in a separate commit, because it uses Reflection
    to update private field with a spied upon instance of the
    CheckpointCoordinator in order to test the expected behaviour. This
    makes it easier to remove/revert at a future point in time.
    
    This is super fragile and ugly, but the alternatives require a
    large refactoring (use factories that can be set during tests)
    or don't test this corner case behaviour.

commit 94aa444cbd7099d7830e06efe3525a717becb740
Author: Ufuk Celebi <uc...@apache.org>
Date:   2017-07-04T15:01:32Z

    [FLINK-7067] [jobmanager] Fix side effects after failed cancel-job-with-savepoint
    
    Problem: If a cancel-job-with-savepoint request fails, this has an
    unintended side effect on the respective job if it has periodic
    checkpoints enabled. The periodic checkpoint scheduler is stopped
    before triggering the savepoint, but not restarted if a savepoint
    fails and the job is not cancelled.
    
    This commit makes sure that the periodic checkpoint scheduler is
    restarted iff periodic checkpoints were enabled before.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4254#discussion_r125570975
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---
    @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws Exception {
     	}
     
     	/**
    +	 * Tests that a failed cancel-job-with-savepoint request does not accidentally disable
    +	 * periodic checkpoints.
    +	 */
    +	@Test
    +	public void testCancelJobWithSavepointFailurePeriodicCheckpoints() throws Exception {
    +		testCancelJobWithSavepointFailure(true);
    +	}
    +
    +	/**
    +	 * Tests that a failed cancel-job-with-savepoint request does not accidentally enable
    +	 * periodic checkpoints.
    +	 */
    +	@Test
    +	public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() throws Exception {
    +		testCancelJobWithSavepointFailure(false);
    +	}
    +
    +	/**
    +	 * Tests that a failed savepoint does not cancel the job and that there are no
    +	 * unintended side effects.
    +	 *
    +	 * @param enablePeriodicCheckpoints Flag to indicate whether to enable periodic checkpoints. We
    +	 * need to test both here in order to verify that we don't accidentally disable or enable
    +	 * checkpoints after a failed cancel-job-with-savepoint request.
    +	 */
    +	private void testCancelJobWithSavepointFailure(
    +		boolean enablePeriodicCheckpoints) throws Exception {
    +
    +		long checkpointInterval = enablePeriodicCheckpoints ? 3600000 : Long.MAX_VALUE;
    +
    +		// Savepoint target
    +		File savepointTarget = tmpFolder.newFolder();
    +		savepointTarget.deleteOnExit();
    +
    +		// Timeout for Akka messages
    +		FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
    +
    +		// A source that declines savepoints, simulating the behaviour
    +		// of a failed savepoint.
    +		JobVertex sourceVertex = new JobVertex("Source");
    +		sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
    +		sourceVertex.setParallelism(1);
    +		JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
    +
    +		final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    +
    +		try {
    +			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    +				new Configuration(),
    +				actorSystem,
    +				TestingUtils.defaultExecutor(),
    +				TestingUtils.defaultExecutor(),
    +				highAvailabilityServices,
    +				Option.apply("jm"),
    +				Option.apply("arch"),
    +				TestingJobManager.class,
    +				TestingMemoryArchivist.class);
    +
    +			UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
    +				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    +				TestingUtils.TESTING_TIMEOUT());
    +
    +			ActorGateway jobManager = new AkkaActorGateway(master._1(), leaderId);
    +
    +			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    +				new Configuration(),
    +				ResourceID.generate(),
    +				actorSystem,
    +				highAvailabilityServices,
    +				"localhost",
    +				Option.apply("tm"),
    +				true,
    +				TestingTaskManager.class);
    +
    +			ActorGateway taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
    +
    +			// Wait until connected
    +			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    +			Await.ready(taskManager.ask(msg, askTimeout), askTimeout);
    +
    +			JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
    +				Collections.singletonList(sourceVertex.getID()),
    +				Collections.singletonList(sourceVertex.getID()),
    +				Collections.singletonList(sourceVertex.getID()),
    +				checkpointInterval,
    +				3600000,
    +				0,
    +				Integer.MAX_VALUE,
    +				ExternalizedCheckpointSettings.none(),
    +				null,
    +				true);
    +
    +			jobGraph.setSnapshotSettings(snapshottingSettings);
    +
    +			// Submit job graph
    +			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
    +			Await.ready(jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			// Wait for all tasks to be running
    +			msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
    +			Await.ready(jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			// ----------------------------------------------------------------
    +			// Super ugly... sorry! But this is one of the few bad options
    +			// to test this here. Ideally, we would have a factory that we
    +			// can set in tests as desired. But we don't. So here we go...
    +			msg = new RequestExecutionGraph(jobGraph.getJobID());
    +			Object result = Await.result(jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			ExecutionGraph eg;
    +			if (result instanceof ExecutionGraphFound) {
    +				// Sorry...
    +				eg = (ExecutionGraph) ((ExecutionGraphFound) result).executionGraph();
    +			} else {
    +				throw new RuntimeException("Could not access ExecutionGraph for job with "
    +					+ "ID " + jobGraph.getJobID() + ". Response: " + result.toString());
    +
    +			}
    +
    +			Field field = eg.getClass().getDeclaredField("checkpointCoordinator");
    +			field.setAccessible(true);
    +			CheckpointCoordinator coord = (CheckpointCoordinator) field.get(eg);
    +			CheckpointCoordinator spiedCoord = Mockito.spy(coord);
    +			field.set(eg, spiedCoord);
    +			// ----------------------------------------------------------------
    +
    +			// Cancel with savepoint
    +			msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(),
    +				savepointTarget.getAbsolutePath());
    +			CancellationResponse cancelResp = (CancellationResponse) Await.result(
    +				jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			if (cancelResp instanceof CancellationFailure) {
    +				if (enablePeriodicCheckpoints) {
    +					// Verify checkpoint scheduler deactivated and reactivated.
    +					// A call to start checkpoint scheduler calls stop scheduler
    +					// again. Therefore, we verify two calls for stop. Since we
    +					// spy (I know...) on the coordinator after the job has
    +					// started, we don't count calls before spying.
    +					verify(spiedCoord, times(1)).startCheckpointScheduler();
    --- End diff --
    
    Could we not re-attempt a cancel-with-savepoint? If the coordinator is shutdown it will fail; if it was restarted it should succeed (provided we adjust the failing source to only fail the first time). Then we wouldn't need the spying but would actually just test observable behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4254#discussion_r145923787
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---
    @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws Exception {
     	}
     
     	/**
    +	 * Tests that a failed cancel-job-with-savepoint request does not accidentally disable
    +	 * periodic checkpoints.
    +	 */
    +	@Test
    +	public void testCancelJobWithSavepointFailurePeriodicCheckpoints() throws Exception {
    +		testCancelJobWithSavepointFailure(true);
    +	}
    +
    +	/**
    +	 * Tests that a failed cancel-job-with-savepoint request does not accidentally enable
    +	 * periodic checkpoints.
    +	 */
    +	@Test
    +	public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() throws Exception {
    +		testCancelJobWithSavepointFailure(false);
    +	}
    +
    +	/**
    +	 * Tests that a failed savepoint does not cancel the job and that there are no
    +	 * unintended side effects.
    +	 *
    +	 * @param enablePeriodicCheckpoints Flag to indicate whether to enable periodic checkpoints. We
    +	 * need to test both here in order to verify that we don't accidentally disable or enable
    +	 * checkpoints after a failed cancel-job-with-savepoint request.
    +	 */
    +	private void testCancelJobWithSavepointFailure(
    +		boolean enablePeriodicCheckpoints) throws Exception {
    +
    +		long checkpointInterval = enablePeriodicCheckpoints ? 3600000 : Long.MAX_VALUE;
    +
    +		// Savepoint target
    +		File savepointTarget = tmpFolder.newFolder();
    +		savepointTarget.deleteOnExit();
    +
    +		// Timeout for Akka messages
    +		FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
    +
    +		// A source that declines savepoints, simulating the behaviour
    +		// of a failed savepoint.
    +		JobVertex sourceVertex = new JobVertex("Source");
    +		sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
    +		sourceVertex.setParallelism(1);
    +		JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
    +
    +		final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    +
    +		try {
    +			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    +				new Configuration(),
    +				actorSystem,
    +				TestingUtils.defaultExecutor(),
    +				TestingUtils.defaultExecutor(),
    +				highAvailabilityServices,
    +				Option.apply("jm"),
    +				Option.apply("arch"),
    +				TestingJobManager.class,
    +				TestingMemoryArchivist.class);
    +
    +			UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
    +				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    +				TestingUtils.TESTING_TIMEOUT());
    +
    +			ActorGateway jobManager = new AkkaActorGateway(master._1(), leaderId);
    +
    +			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    +				new Configuration(),
    +				ResourceID.generate(),
    +				actorSystem,
    +				highAvailabilityServices,
    +				"localhost",
    +				Option.apply("tm"),
    +				true,
    +				TestingTaskManager.class);
    +
    +			ActorGateway taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
    --- End diff --
    
    Definitely +1


---

[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4254#discussion_r125606557
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---
    @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws Exception {
     	}
     
     	/**
    +	 * Tests that a failed cancel-job-with-savepoint request does not accidentally disable
    +	 * periodic checkpoints.
    +	 */
    +	@Test
    +	public void testCancelJobWithSavepointFailurePeriodicCheckpoints() throws Exception {
    +		testCancelJobWithSavepointFailure(true);
    +	}
    +
    +	/**
    +	 * Tests that a failed cancel-job-with-savepoint request does not accidentally enable
    +	 * periodic checkpoints.
    +	 */
    +	@Test
    +	public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() throws Exception {
    +		testCancelJobWithSavepointFailure(false);
    +	}
    +
    +	/**
    +	 * Tests that a failed savepoint does not cancel the job and that there are no
    +	 * unintended side effects.
    +	 *
    +	 * @param enablePeriodicCheckpoints Flag to indicate whether to enable periodic checkpoints. We
    +	 * need to test both here in order to verify that we don't accidentally disable or enable
    +	 * checkpoints after a failed cancel-job-with-savepoint request.
    +	 */
    +	private void testCancelJobWithSavepointFailure(
    +		boolean enablePeriodicCheckpoints) throws Exception {
    +
    +		long checkpointInterval = enablePeriodicCheckpoints ? 3600000 : Long.MAX_VALUE;
    +
    +		// Savepoint target
    +		File savepointTarget = tmpFolder.newFolder();
    +		savepointTarget.deleteOnExit();
    +
    +		// Timeout for Akka messages
    +		FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
    +
    +		// A source that declines savepoints, simulating the behaviour
    +		// of a failed savepoint.
    +		JobVertex sourceVertex = new JobVertex("Source");
    +		sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
    +		sourceVertex.setParallelism(1);
    +		JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
    +
    +		final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    +
    +		try {
    +			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    +				new Configuration(),
    +				actorSystem,
    +				TestingUtils.defaultExecutor(),
    +				TestingUtils.defaultExecutor(),
    +				highAvailabilityServices,
    +				Option.apply("jm"),
    +				Option.apply("arch"),
    +				TestingJobManager.class,
    +				TestingMemoryArchivist.class);
    +
    +			UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
    +				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    +				TestingUtils.TESTING_TIMEOUT());
    +
    +			ActorGateway jobManager = new AkkaActorGateway(master._1(), leaderId);
    +
    +			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    +				new Configuration(),
    +				ResourceID.generate(),
    +				actorSystem,
    +				highAvailabilityServices,
    +				"localhost",
    +				Option.apply("tm"),
    +				true,
    +				TestingTaskManager.class);
    +
    +			ActorGateway taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
    +
    +			// Wait until connected
    +			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    +			Await.ready(taskManager.ask(msg, askTimeout), askTimeout);
    +
    +			JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
    +				Collections.singletonList(sourceVertex.getID()),
    +				Collections.singletonList(sourceVertex.getID()),
    +				Collections.singletonList(sourceVertex.getID()),
    +				checkpointInterval,
    +				3600000,
    +				0,
    +				Integer.MAX_VALUE,
    +				ExternalizedCheckpointSettings.none(),
    +				null,
    +				true);
    +
    +			jobGraph.setSnapshotSettings(snapshottingSettings);
    +
    +			// Submit job graph
    +			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
    +			Await.ready(jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			// Wait for all tasks to be running
    +			msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
    +			Await.ready(jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			// ----------------------------------------------------------------
    +			// Super ugly... sorry! But this is one of the few bad options
    +			// to test this here. Ideally, we would have a factory that we
    +			// can set in tests as desired. But we don't. So here we go...
    +			msg = new RequestExecutionGraph(jobGraph.getJobID());
    +			Object result = Await.result(jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			ExecutionGraph eg;
    +			if (result instanceof ExecutionGraphFound) {
    +				// Sorry...
    +				eg = (ExecutionGraph) ((ExecutionGraphFound) result).executionGraph();
    +			} else {
    +				throw new RuntimeException("Could not access ExecutionGraph for job with "
    +					+ "ID " + jobGraph.getJobID() + ". Response: " + result.toString());
    +
    +			}
    +
    +			Field field = eg.getClass().getDeclaredField("checkpointCoordinator");
    +			field.setAccessible(true);
    +			CheckpointCoordinator coord = (CheckpointCoordinator) field.get(eg);
    +			CheckpointCoordinator spiedCoord = Mockito.spy(coord);
    +			field.set(eg, spiedCoord);
    +			// ----------------------------------------------------------------
    +
    +			// Cancel with savepoint
    +			msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(),
    +				savepointTarget.getAbsolutePath());
    +			CancellationResponse cancelResp = (CancellationResponse) Await.result(
    +				jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			if (cancelResp instanceof CancellationFailure) {
    +				if (enablePeriodicCheckpoints) {
    +					// Verify checkpoint scheduler deactivated and reactivated.
    +					// A call to start checkpoint scheduler calls stop scheduler
    +					// again. Therefore, we verify two calls for stop. Since we
    +					// spy (I know...) on the coordinator after the job has
    +					// started, we don't count calls before spying.
    +					verify(spiedCoord, times(1)).startCheckpointScheduler();
    --- End diff --
    
    The thing is that the stopping of the scheduler is part of the expected behaviour of cancel-with-job-savepoint, because we don't want any checkpoints between the savepoint and cancel job (https://issues.apache.org/jira/browse/FLINK-4717). I think for that we do need the spying :-( It was simply not fully tested before... Does this make sense or am I missing your point?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4254


---

[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4254
  
    I think it's alright that way.


---

[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4254
  
    I think this is a meaningful fix.
    
    I would suggest to do the tests different, though. The tests of the CheckpointCoordinator overdo the mockito stuff so heavily that it becomes an extremely hard job to change anything in the CheckpointCoordinator. Mocks are super maintenance heavy, compared to actual test implementations of interfaces or classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/4254
  
    @tillrohrmann Thanks for looking over this. The `TestingCluster` is definitely preferable. I don't recall how I ended up with the custom setup instead of the `TestingCluster`.
    
    I changed the test to wait for another checkpoint after the failed savepoint. I also considered this for the initial PR, but went with mocking in order to test the case that periodic checkpoints were not activated before the cancellation [1]. I think the current variant is a good compromise between completeness and simplicity though.
    
    [1] As seen in the diff of `JobManager.scala`, we only activate the periodic scheduler after a failed cancellation iff it was activated before cancellation. This case can't be tested robustly with the current approach. We could wait for some time and if no checkpoint arrives in that time consider checkpoints as not accidentally activated, but that's not robust. I would therefore ignore this case if you don't have another idea.



---

[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4254#discussion_r145907824
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---
    @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws Exception {
     	}
     
     	/**
    +	 * Tests that a failed cancel-job-with-savepoint request does not accidentally disable
    +	 * periodic checkpoints.
    +	 */
    +	@Test
    +	public void testCancelJobWithSavepointFailurePeriodicCheckpoints() throws Exception {
    +		testCancelJobWithSavepointFailure(true);
    +	}
    +
    +	/**
    +	 * Tests that a failed cancel-job-with-savepoint request does not accidentally enable
    +	 * periodic checkpoints.
    +	 */
    +	@Test
    +	public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() throws Exception {
    +		testCancelJobWithSavepointFailure(false);
    +	}
    +
    +	/**
    +	 * Tests that a failed savepoint does not cancel the job and that there are no
    +	 * unintended side effects.
    +	 *
    +	 * @param enablePeriodicCheckpoints Flag to indicate whether to enable periodic checkpoints. We
    +	 * need to test both here in order to verify that we don't accidentally disable or enable
    +	 * checkpoints after a failed cancel-job-with-savepoint request.
    +	 */
    +	private void testCancelJobWithSavepointFailure(
    +		boolean enablePeriodicCheckpoints) throws Exception {
    +
    +		long checkpointInterval = enablePeriodicCheckpoints ? 3600000 : Long.MAX_VALUE;
    +
    +		// Savepoint target
    +		File savepointTarget = tmpFolder.newFolder();
    +		savepointTarget.deleteOnExit();
    +
    +		// Timeout for Akka messages
    +		FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
    +
    +		// A source that declines savepoints, simulating the behaviour
    +		// of a failed savepoint.
    +		JobVertex sourceVertex = new JobVertex("Source");
    +		sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
    +		sourceVertex.setParallelism(1);
    +		JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
    +
    +		final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    +
    +		try {
    +			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    +				new Configuration(),
    +				actorSystem,
    +				TestingUtils.defaultExecutor(),
    +				TestingUtils.defaultExecutor(),
    +				highAvailabilityServices,
    +				Option.apply("jm"),
    +				Option.apply("arch"),
    +				TestingJobManager.class,
    +				TestingMemoryArchivist.class);
    +
    +			UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
    +				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    +				TestingUtils.TESTING_TIMEOUT());
    +
    +			ActorGateway jobManager = new AkkaActorGateway(master._1(), leaderId);
    +
    +			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    +				new Configuration(),
    +				ResourceID.generate(),
    +				actorSystem,
    +				highAvailabilityServices,
    +				"localhost",
    +				Option.apply("tm"),
    +				true,
    +				TestingTaskManager.class);
    +
    +			ActorGateway taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
    +
    +			// Wait until connected
    +			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
    +			Await.ready(taskManager.ask(msg, askTimeout), askTimeout);
    +
    +			JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
    +				Collections.singletonList(sourceVertex.getID()),
    +				Collections.singletonList(sourceVertex.getID()),
    +				Collections.singletonList(sourceVertex.getID()),
    +				checkpointInterval,
    +				3600000,
    +				0,
    +				Integer.MAX_VALUE,
    +				ExternalizedCheckpointSettings.none(),
    +				null,
    +				true);
    +
    +			jobGraph.setSnapshotSettings(snapshottingSettings);
    +
    +			// Submit job graph
    +			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
    +			Await.ready(jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			// Wait for all tasks to be running
    +			msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
    +			Await.ready(jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			// ----------------------------------------------------------------
    +			// Super ugly... sorry! But this is one of the few bad options
    +			// to test this here. Ideally, we would have a factory that we
    +			// can set in tests as desired. But we don't. So here we go...
    +			msg = new RequestExecutionGraph(jobGraph.getJobID());
    +			Object result = Await.result(jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			ExecutionGraph eg;
    +			if (result instanceof ExecutionGraphFound) {
    +				// Sorry...
    +				eg = (ExecutionGraph) ((ExecutionGraphFound) result).executionGraph();
    +			} else {
    +				throw new RuntimeException("Could not access ExecutionGraph for job with "
    +					+ "ID " + jobGraph.getJobID() + ". Response: " + result.toString());
    +
    +			}
    +
    +			Field field = eg.getClass().getDeclaredField("checkpointCoordinator");
    +			field.setAccessible(true);
    +			CheckpointCoordinator coord = (CheckpointCoordinator) field.get(eg);
    +			CheckpointCoordinator spiedCoord = Mockito.spy(coord);
    +			field.set(eg, spiedCoord);
    +			// ----------------------------------------------------------------
    +
    +			// Cancel with savepoint
    +			msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(),
    +				savepointTarget.getAbsolutePath());
    +			CancellationResponse cancelResp = (CancellationResponse) Await.result(
    +				jobManager.ask(msg, askTimeout), askTimeout);
    +
    +			if (cancelResp instanceof CancellationFailure) {
    +				if (enablePeriodicCheckpoints) {
    +					// Verify checkpoint scheduler deactivated and reactivated.
    +					// A call to start checkpoint scheduler calls stop scheduler
    +					// again. Therefore, we verify two calls for stop. Since we
    +					// spy (I know...) on the coordinator after the job has
    +					// started, we don't count calls before spying.
    +					verify(spiedCoord, times(1)).startCheckpointScheduler();
    --- End diff --
    
    Can't we check that the submitted tasks sees another checkpoint barrier after a savepoint has been triggered? That way we would get around spying on the `CheckpointCoordinator`.


---

[GitHub] flink pull request #4254: [FLINK-7067] [jobmanager] Fix side effects after f...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4254#discussion_r145907545
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---
    @@ -940,6 +955,177 @@ public void testCancelWithSavepoint() throws Exception {
     	}
     
     	/**
    +	 * Tests that a failed cancel-job-with-savepoint request does not accidentally disable
    +	 * periodic checkpoints.
    +	 */
    +	@Test
    +	public void testCancelJobWithSavepointFailurePeriodicCheckpoints() throws Exception {
    +		testCancelJobWithSavepointFailure(true);
    +	}
    +
    +	/**
    +	 * Tests that a failed cancel-job-with-savepoint request does not accidentally enable
    +	 * periodic checkpoints.
    +	 */
    +	@Test
    +	public void testCancelJobWithSavepointFailureNoPeriodicCheckpoints() throws Exception {
    +		testCancelJobWithSavepointFailure(false);
    +	}
    +
    +	/**
    +	 * Tests that a failed savepoint does not cancel the job and that there are no
    +	 * unintended side effects.
    +	 *
    +	 * @param enablePeriodicCheckpoints Flag to indicate whether to enable periodic checkpoints. We
    +	 * need to test both here in order to verify that we don't accidentally disable or enable
    +	 * checkpoints after a failed cancel-job-with-savepoint request.
    +	 */
    +	private void testCancelJobWithSavepointFailure(
    +		boolean enablePeriodicCheckpoints) throws Exception {
    +
    +		long checkpointInterval = enablePeriodicCheckpoints ? 3600000 : Long.MAX_VALUE;
    +
    +		// Savepoint target
    +		File savepointTarget = tmpFolder.newFolder();
    +		savepointTarget.deleteOnExit();
    +
    +		// Timeout for Akka messages
    +		FiniteDuration askTimeout = new FiniteDuration(30, TimeUnit.SECONDS);
    +
    +		// A source that declines savepoints, simulating the behaviour
    +		// of a failed savepoint.
    +		JobVertex sourceVertex = new JobVertex("Source");
    +		sourceVertex.setInvokableClass(FailOnSavepointStatefulTask.class);
    +		sourceVertex.setParallelism(1);
    +		JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
    +
    +		final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
    +
    +		try {
    +			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
    +				new Configuration(),
    +				actorSystem,
    +				TestingUtils.defaultExecutor(),
    +				TestingUtils.defaultExecutor(),
    +				highAvailabilityServices,
    +				Option.apply("jm"),
    +				Option.apply("arch"),
    +				TestingJobManager.class,
    +				TestingMemoryArchivist.class);
    +
    +			UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
    +				highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
    +				TestingUtils.TESTING_TIMEOUT());
    +
    +			ActorGateway jobManager = new AkkaActorGateway(master._1(), leaderId);
    +
    +			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
    +				new Configuration(),
    +				ResourceID.generate(),
    +				actorSystem,
    +				highAvailabilityServices,
    +				"localhost",
    +				Option.apply("tm"),
    +				true,
    +				TestingTaskManager.class);
    +
    +			ActorGateway taskManager = new AkkaActorGateway(taskManagerRef, leaderId);
    --- End diff --
    
    Can't we simply use a `TestingCluster` here for all the setup work?


---

[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/4254
  
    Travis gave the green light, merging this now.


---

[GitHub] flink issue #4254: [FLINK-7067] [jobmanager] Fix side effects after failed c...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/4254
  
    Cool! I'll rebase this and merge after Travis gives the green light.


---