You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GJL <gi...@git.apache.org> on 2018/03/02 14:15:58 UTC

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

GitHub user GJL opened a pull request:

    https://github.com/apache/flink/pull/5622

    [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint

    ## What is the purpose of the change
    
    *Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher and
    JobMaster. Stop checkpoint scheduler before taking savepoint to make sure that
    the savepoint created by this command is the last one.*
    
    cc: @tillrohrmann 
    
    ## Brief change log
    
      - *Implement RestClusterClient.cancelWithSavepoint*
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
      - *Added `JobMasterTriggerSavepointIT`.*
      - *Manually tested.*
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/GJL/flink FLINK-8459-2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5622.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5622
    
----
commit 7e913b0d1eab8453279ffacc11f4633b9263190d
Author: gyao <ga...@...>
Date:   2018-03-02T14:11:36Z

    [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint
    
    Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher and
    JobMaster. Stop checkpoint scheduler before taking savepoint to make sure that
    the savepoint created by this command is the last one.

----


---

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171886987
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java ---
    @@ -36,15 +36,26 @@
     	@Nullable
     	private final String targetDirectory;
     
    +	private final boolean cancelJob;
    --- End diff --
    
    Not annotated with `@JsonProperty`


---

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171858662
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.client.program.MiniClusterClient;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
    +import org.apache.flink.runtime.checkpoint.CheckpointOptions;
    +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
    +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
    +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
    +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
    +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
    +import org.apache.flink.test.util.AbstractTestBase;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.ExceptionUtils;
    +
    +import org.junit.Assume;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +
    +import static org.hamcrest.MatcherAssert.assertThat;
    +import static org.hamcrest.Matchers.equalTo;
    +import static org.hamcrest.Matchers.hasItem;
    +import static org.hamcrest.Matchers.isOneOf;
    +
    +/**
    + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}.
    + *
    + * @see org.apache.flink.runtime.jobmaster.JobMaster
    + */
    +@Category(Flip6.class)
    +public class JobMasterTriggerSavepointIT extends AbstractTestBase {
    +
    +	private static CountDownLatch invokeLatch;
    +
    +	private static volatile CountDownLatch triggerCheckpointLatch;
    +
    +	@Rule
    +	public TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private Path savepointDirectory;
    +	private MiniClusterClient clusterClient;
    +	private JobGraph jobGraph;
    +
    +	@Before
    +	public void setUp() throws Exception {
    +		invokeLatch = new CountDownLatch(1);
    +		triggerCheckpointLatch = new CountDownLatch(1);
    +		savepointDirectory = temporaryFolder.newFolder().toPath();
    +
    +		Assume.assumeTrue(
    --- End diff --
    
    shouldn't happen if category is `flip6`


---

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171858158
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java ---
    @@ -34,8 +34,8 @@
     	}
     
     	@Override
    -	protected SavepointTriggerRequestBody getTestRequestInstance() throws Exception {
    -		return new SavepointTriggerRequestBody("/tmp");
    +	protected SavepointTriggerRequestBody getTestRequestInstance() {
    +		return new SavepointTriggerRequestBody("/tmp", true);
    --- End diff --
    
    strictly speaking the `false` case should be tested as well


---

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171888774
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java ---
    @@ -36,15 +36,26 @@
     	@Nullable
     	private final String targetDirectory;
     
    +	private final boolean cancelJob;
    --- End diff --
    
    fixed


---

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171857387
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) {
     
     	@Override
     	public CompletableFuture<String> triggerSavepoint(
    -		@Nullable final String targetDirectory,
    -		final Time timeout) {
    -		try {
    -			return executionGraph.getCheckpointCoordinator()
    -				.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    -				.thenApply(CompletedCheckpoint::getExternalPointer);
    -		} catch (Exception e) {
    -			return FutureUtils.completedExceptionally(e);
    +			@Nullable final String targetDirectory,
    +			final boolean cancelJob,
    +			final Time timeout) {
    +
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		if (checkpointCoordinator == null) {
    +			return FutureUtils.completedExceptionally(new IllegalStateException(
    +				String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
    --- End diff --
    
    If the job is in a terminal state, the coordinator will be `null`ed as well.


---

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171857517
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) {
     
     	@Override
     	public CompletableFuture<String> triggerSavepoint(
    -		@Nullable final String targetDirectory,
    -		final Time timeout) {
    -		try {
    -			return executionGraph.getCheckpointCoordinator()
    -				.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    -				.thenApply(CompletedCheckpoint::getExternalPointer);
    -		} catch (Exception e) {
    -			return FutureUtils.completedExceptionally(e);
    +			@Nullable final String targetDirectory,
    +			final boolean cancelJob,
    +			final Time timeout) {
    +
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		if (checkpointCoordinator == null) {
    +			return FutureUtils.completedExceptionally(new IllegalStateException(
    +				String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
    +		}
    +
    +		if (cancelJob) {
    +			checkpointCoordinator.stopCheckpointScheduler();
    +		}
    +		return checkpointCoordinator
    +			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    +			.thenApply(CompletedCheckpoint::getExternalPointer)
    +			.thenApplyAsync(path -> {
    +				if (cancelJob) {
    +					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
    +					cancel(timeout);
    +				}
    +				return path;
    +			}, getMainThreadExecutor())
    +			.exceptionally(throwable -> {
    +				if (cancelJob) {
    +					startCheckpointScheduler(checkpointCoordinator);
    --- End diff --
    
    If the cancelation failed, we restart the scheduler as well. I think this differs from the previous implementation.


---

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5622


---

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171858873
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.client.program.MiniClusterClient;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
    +import org.apache.flink.runtime.checkpoint.CheckpointOptions;
    +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
    +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
    +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
    +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
    +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
    +import org.apache.flink.test.util.AbstractTestBase;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.ExceptionUtils;
    +
    +import org.junit.Assume;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +
    +import static org.hamcrest.MatcherAssert.assertThat;
    +import static org.hamcrest.Matchers.equalTo;
    +import static org.hamcrest.Matchers.hasItem;
    +import static org.hamcrest.Matchers.isOneOf;
    +
    +/**
    + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}.
    + *
    + * @see org.apache.flink.runtime.jobmaster.JobMaster
    + */
    +@Category(Flip6.class)
    +public class JobMasterTriggerSavepointIT extends AbstractTestBase {
    +
    +	private static CountDownLatch invokeLatch;
    +
    +	private static volatile CountDownLatch triggerCheckpointLatch;
    +
    +	@Rule
    +	public TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private Path savepointDirectory;
    +	private MiniClusterClient clusterClient;
    +	private JobGraph jobGraph;
    +
    +	@Before
    +	public void setUp() throws Exception {
    +		invokeLatch = new CountDownLatch(1);
    +		triggerCheckpointLatch = new CountDownLatch(1);
    +		savepointDirectory = temporaryFolder.newFolder().toPath();
    +
    +		Assume.assumeTrue(
    +			"ClusterClient is not an instance of MiniClusterClient",
    +			miniClusterResource.getClusterClient() instanceof MiniClusterClient);
    +
    +		clusterClient = (MiniClusterClient) miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +
    +		jobGraph = new JobGraph();
    +
    +		final JobVertex vertex = new JobVertex("testVertex");
    +		vertex.setInvokableClass(NoOpBlockingInvokable.class);
    +		jobGraph.addVertex(vertex);
    +
    +		jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
    +			Collections.singletonList(vertex.getID()),
    +			Collections.singletonList(vertex.getID()),
    +			Collections.singletonList(vertex.getID()),
    +			new CheckpointCoordinatorConfiguration(
    +				10,
    +				60_000,
    +				10,
    +				1,
    +				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
    +				true),
    +			null
    +		));
    +
    +		clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
    +		invokeLatch.await(60, TimeUnit.SECONDS);
    +		waitForJob();
    +	}
    +
    +	@Test
    +	public void testStopJobAfterSavepoint() throws Exception {
    +		final String savepointLocation = cancelWithSavepoint();
    +		final JobStatus jobStatus = clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
    +
    +		assertThat(jobStatus, isOneOf(JobStatus.CANCELED, JobStatus.CANCELLING));
    +
    +		final List<Path> savepoints = Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
    +		assertThat(savepoints, hasItem(Paths.get(savepointLocation).getFileName()));
    +	}
    +
    +	@Test
    +	public void testDoNotCancelJobIfSavepointFails() throws Exception {
    +		try {
    +			Files.setPosixFilePermissions(savepointDirectory, Collections.emptySet());
    +		} catch (IOException e) {
    +			Assume.assumeNoException(e);
    +		}
    +
    +		try {
    +			cancelWithSavepoint();
    +		} catch (Exception e) {
    +			assertThat(ExceptionUtils.findThrowable(e, CheckpointTriggerException.class).isPresent(), equalTo(true));
    +		}
    +
    +		final JobStatus jobStatus = clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
    +		assertThat(jobStatus, equalTo(JobStatus.RUNNING));
    +
    +		// assert that checkpoints are continued to be triggered
    +		triggerCheckpointLatch = new CountDownLatch(1);
    +		assertThat(triggerCheckpointLatch.await(60, TimeUnit.SECONDS), equalTo(true));
    +	}
    +
    +	private void waitForJob() throws Exception {
    --- End diff --
    
    unfortunately this is needed because the `TaskmanagerRunner` is registered in the Dispatcher after the task is started


---

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171857970
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---
    @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) {
     
     	@Override
     	public CompletableFuture<String> triggerSavepoint(
    -		@Nullable final String targetDirectory,
    -		final Time timeout) {
    -		try {
    -			return executionGraph.getCheckpointCoordinator()
    -				.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    -				.thenApply(CompletedCheckpoint::getExternalPointer);
    -		} catch (Exception e) {
    -			return FutureUtils.completedExceptionally(e);
    +			@Nullable final String targetDirectory,
    +			final boolean cancelJob,
    +			final Time timeout) {
    +
    +		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
    +		if (checkpointCoordinator == null) {
    +			return FutureUtils.completedExceptionally(new IllegalStateException(
    +				String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
    +		}
    +
    +		if (cancelJob) {
    +			checkpointCoordinator.stopCheckpointScheduler();
    +		}
    +		return checkpointCoordinator
    +			.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
    +			.thenApply(CompletedCheckpoint::getExternalPointer)
    +			.thenApplyAsync(path -> {
    +				if (cancelJob) {
    +					log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
    +					cancel(timeout);
    +				}
    +				return path;
    +			}, getMainThreadExecutor())
    +			.exceptionally(throwable -> {
    +				if (cancelJob) {
    +					startCheckpointScheduler(checkpointCoordinator);
    +				}
    +				throw new CompletionException(throwable);
    +			});
    +	}
    +
    +	private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
    --- End diff --
    
    Method can be reused in the job rescaling logic.


---

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

Posted by GJL <gi...@git.apache.org>.
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5622#discussion_r171858593
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java ---
    @@ -0,0 +1,219 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.jobmaster;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.client.program.MiniClusterClient;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
    +import org.apache.flink.runtime.checkpoint.CheckpointOptions;
    +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
    +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
    +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
    +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
    +import org.apache.flink.runtime.execution.Environment;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.jobgraph.JobVertex;
    +import org.apache.flink.runtime.jobgraph.OperatorID;
    +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
    +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
    +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
    +import org.apache.flink.test.util.AbstractTestBase;
    +import org.apache.flink.testutils.category.Flip6;
    +import org.apache.flink.util.ExceptionUtils;
    +
    +import org.junit.Assume;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.experimental.categories.Category;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.IOException;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.nio.file.Paths;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.stream.Collectors;
    +
    +import static org.hamcrest.MatcherAssert.assertThat;
    +import static org.hamcrest.Matchers.equalTo;
    +import static org.hamcrest.Matchers.hasItem;
    +import static org.hamcrest.Matchers.isOneOf;
    +
    +/**
    + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}.
    + *
    + * @see org.apache.flink.runtime.jobmaster.JobMaster
    + */
    +@Category(Flip6.class)
    +public class JobMasterTriggerSavepointIT extends AbstractTestBase {
    +
    +	private static CountDownLatch invokeLatch;
    +
    +	private static volatile CountDownLatch triggerCheckpointLatch;
    +
    +	@Rule
    +	public TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private Path savepointDirectory;
    +	private MiniClusterClient clusterClient;
    +	private JobGraph jobGraph;
    +
    +	@Before
    +	public void setUp() throws Exception {
    +		invokeLatch = new CountDownLatch(1);
    +		triggerCheckpointLatch = new CountDownLatch(1);
    +		savepointDirectory = temporaryFolder.newFolder().toPath();
    +
    +		Assume.assumeTrue(
    +			"ClusterClient is not an instance of MiniClusterClient",
    +			miniClusterResource.getClusterClient() instanceof MiniClusterClient);
    +
    +		clusterClient = (MiniClusterClient) miniClusterResource.getClusterClient();
    +		clusterClient.setDetached(true);
    +
    +		jobGraph = new JobGraph();
    +
    +		final JobVertex vertex = new JobVertex("testVertex");
    +		vertex.setInvokableClass(NoOpBlockingInvokable.class);
    +		jobGraph.addVertex(vertex);
    +
    +		jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
    +			Collections.singletonList(vertex.getID()),
    +			Collections.singletonList(vertex.getID()),
    +			Collections.singletonList(vertex.getID()),
    +			new CheckpointCoordinatorConfiguration(
    +				10,
    +				60_000,
    +				10,
    +				1,
    +				CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
    +				true),
    +			null
    +		));
    +
    +		clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());
    +		invokeLatch.await(60, TimeUnit.SECONDS);
    +		waitForJob();
    +	}
    +
    +	@Test
    +	public void testStopJobAfterSavepoint() throws Exception {
    +		final String savepointLocation = cancelWithSavepoint();
    +		final JobStatus jobStatus = clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
    +
    +		assertThat(jobStatus, isOneOf(JobStatus.CANCELED, JobStatus.CANCELLING));
    +
    +		final List<Path> savepoints = Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
    +		assertThat(savepoints, hasItem(Paths.get(savepointLocation).getFileName()));
    +	}
    +
    +	@Test
    +	public void testDoNotCancelJobIfSavepointFails() throws Exception {
    +		try {
    +			Files.setPosixFilePermissions(savepointDirectory, Collections.emptySet());
    +		} catch (IOException e) {
    +			Assume.assumeNoException(e);
    +		}
    +
    +		try {
    +			cancelWithSavepoint();
    +		} catch (Exception e) {
    +			assertThat(ExceptionUtils.findThrowable(e, CheckpointTriggerException.class).isPresent(), equalTo(true));
    +		}
    +
    +		final JobStatus jobStatus = clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
    +		assertThat(jobStatus, equalTo(JobStatus.RUNNING));
    +
    +		// assert that checkpoints are continued to be triggered
    +		triggerCheckpointLatch = new CountDownLatch(1);
    +		assertThat(triggerCheckpointLatch.await(60, TimeUnit.SECONDS), equalTo(true));
    +	}
    +
    +	private void waitForJob() throws Exception {
    +		for (int i = 0; i < 60; i++) {
    --- End diff --
    
    no need to wait 1 minute


---