You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/02 09:29:48 UTC

[10/16] flink git commit: [FLINK-9196] [flip6, yarn] Cleanup application files when deregistering YARN AM

[FLINK-9196] [flip6, yarn] Cleanup application files when deregistering YARN AM

Enable graceful cluster shut down via HTTP.
Remove Flink application files from remote file system when the
YarnResourceManager deregisters the YARN ApplicationMaster.

This closes #5938


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d130f878
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d130f878
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d130f878

Branch: refs/heads/release-1.5
Commit: d130f87822818ba6f5c9dbd77ab3a774fdc41e60
Parents: a8425e5
Author: gyao <ga...@data-artisans.com>
Authored: Thu Apr 19 12:07:54 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Apr 30 23:25:19 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    |  4 +-
 .../client/deployment/ClusterDescriptor.java    |  2 +-
 .../LegacyStandaloneClusterDescriptor.java      |  2 +-
 .../deployment/StandaloneClusterDescriptor.java |  2 +-
 .../flink/client/program/ClusterClient.java     |  3 +
 .../client/program/rest/RestClusterClient.java  | 16 +++++
 .../client/cli/util/DummyClusterDescriptor.java |  2 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  6 ++
 .../rest/handler/cluster/ShutdownHandler.java   | 58 ++++++++++++++++
 .../rest/messages/cluster/ShutdownHeaders.java  | 69 ++++++++++++++++++++
 .../runtime/webmonitor/RestfulGateway.java      |  4 ++
 .../runtime/webmonitor/WebMonitorEndpoint.java  | 11 ++++
 .../flink/yarn/YarnConfigurationITCase.java     |  2 +-
 .../util/NonDeployingYarnClusterDescriptor.java |  2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 19 +++++-
 .../main/java/org/apache/flink/yarn/Utils.java  | 24 +++++++
 .../apache/flink/yarn/YarnClusterClient.java    | 15 +++++
 .../apache/flink/yarn/YarnResourceManager.java  |  2 +
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 10 +--
 .../org/apache/flink/yarn/YarnJobManager.scala  |  4 +-
 .../java/org/apache/flink/yarn/UtilsTest.java   | 52 +++++++++++++++
 .../flink/yarn/YarnResourceManagerTest.java     | 26 +++++++-
 22 files changed, 312 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 7745ca0..95a9949 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -277,8 +277,8 @@ public class CliFrontend {
 					if (clusterId == null && !client.isDetached()) {
 						// terminate the cluster only if we have started it before and if it's not detached
 						try {
-							clusterDescriptor.terminateCluster(client.getClusterId());
-						} catch (FlinkException e) {
+							client.shutDownCluster();
+						} catch (final Exception e) {
 							LOG.info("Could not properly terminate the Flink cluster.", e);
 						}
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index f9f5d4b..e6b3922 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -72,5 +72,5 @@ public interface ClusterDescriptor<T> extends AutoCloseable {
 	 * @param clusterId identifying the cluster to shut down
 	 * @throws FlinkException if the cluster could not be terminated
 	 */
-	void terminateCluster(T clusterId) throws FlinkException;
+	void killCluster(T clusterId) throws FlinkException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/deployment/LegacyStandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/LegacyStandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/LegacyStandaloneClusterDescriptor.java
index 21e020c..b448970 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/LegacyStandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/LegacyStandaloneClusterDescriptor.java
@@ -65,7 +65,7 @@ public class LegacyStandaloneClusterDescriptor implements ClusterDescriptor<Stan
 	}
 
 	@Override
-	public void terminateCluster(StandaloneClusterId clusterId) throws FlinkException {
+	public void killCluster(StandaloneClusterId clusterId) throws FlinkException {
 		throw new UnsupportedOperationException("Cannot terminate standalone clusters.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index c4bcde6..bdf8faf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -66,7 +66,7 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
 	}
 
 	@Override
-	public void terminateCluster(StandaloneClusterId clusterId) throws FlinkException {
+	public void killCluster(StandaloneClusterId clusterId) throws FlinkException {
 		throw new UnsupportedOperationException("Cannot terminate a standalone cluster.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 2ef0b2e..ac779a7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -1036,4 +1036,7 @@ public abstract class ClusterClient<T> {
 		throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support rescaling.");
 	}
 
+	public void shutDownCluster() {
+		throw new UnsupportedOperationException("The " + getClass().getSimpleName() + " does not support shutDownCluster.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 3d50e93..5c04be7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -67,6 +67,7 @@ import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
@@ -570,6 +571,21 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
 			});
 	}
 
+	@Override
+	public void shutDownCluster() {
+		try {
+			sendRetryableRequest(
+				ShutdownHeaders.getInstance(),
+				EmptyMessageParameters.getInstance(),
+				EmptyRequestBody.getInstance(),
+				isConnectionProblemException()).get();
+		} catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+		} catch (ExecutionException e) {
+			log.error("Error while shutting down cluster", e);
+		}
+	}
+
 	/**
 	 * Creates a {@code CompletableFuture} that polls a {@code AsynchronouslyCreatedResource} until
 	 * its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
index df2f3f7..7620ae2 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
@@ -60,7 +60,7 @@ public class DummyClusterDescriptor<T> implements ClusterDescriptor<T> {
 	}
 
 	@Override
-	public void terminateCluster(T clusterId) throws FlinkException {
+	public void killCluster(T clusterId) throws FlinkException {
 		throw new UnsupportedOperationException("Cannot terminate a dummy cluster.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 41efaab..c05255b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -518,6 +518,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				jobMasterGateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
 	}
 
+	@Override
+	public CompletableFuture<Acknowledge> shutDownCluster() {
+		shutDown();
+		return CompletableFuture.completedFuture(Acknowledge.get());
+	}
+
 	/**
 	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
 	 * the data will also be removed from HA.

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ShutdownHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ShutdownHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ShutdownHandler.java
new file mode 100644
index 0000000..73fca58
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/ShutdownHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.cluster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * REST handler which allows to shut down the cluster.
+ */
+public class ShutdownHandler extends
+		AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+	public ShutdownHandler(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout,
+			final Map<String, String> responseHeaders,
+			final MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> messageHeaders) {
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
+	}
+
+	@Override
+	protected CompletableFuture<EmptyResponseBody> handleRequest(
+			@Nonnull final HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
+			@Nonnull final RestfulGateway gateway) throws RestHandlerException {
+		return gateway.shutDownCluster().thenApply(ignored -> EmptyResponseBody.getInstance());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java
new file mode 100644
index 0000000..75a1e99
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/cluster/ShutdownHeaders.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.cluster;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for {@link org.apache.flink.runtime.rest.handler.cluster.ShutdownHandler}.
+ */
+public class ShutdownHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
+
+	private static final ShutdownHeaders INSTANCE = new ShutdownHeaders();
+
+	@Override
+	public Class<EmptyResponseBody> getResponseClass() {
+		return EmptyResponseBody.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.DELETE;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/cluster";
+	}
+
+	public static ShutdownHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 6bb088c..6a6c34b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -200,4 +200,8 @@ public interface RestfulGateway extends RpcGateway {
 			@RpcTimeout Time timeout) {
 		throw new UnsupportedOperationException();
 	}
+
+	default CompletableFuture<Acknowledge> shutDownCluster() {
+		throw new UnsupportedOperationException();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 1a67d92..ef6721b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.cluster.ShutdownHandler;
 import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
@@ -104,6 +105,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeader
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
+import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
@@ -558,6 +560,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			timeout,
 			responseHeaders);
 
+		final ShutdownHandler shutdownHandler = new ShutdownHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			ShutdownHeaders.getInstance());
+
 		final File webUiDir = restConfiguration.getWebUiDir();
 
 		Optional<StaticFileServerHandler<T>> optWebContent;
@@ -619,6 +628,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler));
 		handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler));
 
+		handlers.add(Tuple2.of(ShutdownHeaders.getInstance(), shutdownHandler));
+
 		optWebContent.ifPresent(
 			webContent -> {
 				handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent));

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 3a2f957..a5f7c10 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -184,7 +184,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
 				clusterClient.shutdown();
 			}
 
-			clusterDescriptor.terminateCluster(clusterId);
+			clusterDescriptor.killCluster(clusterId);
 
 		} finally {
 			clusterDescriptor.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
index 4916b73..de0e1d7 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
@@ -83,7 +83,7 @@ public class NonDeployingYarnClusterDescriptor extends AbstractYarnClusterDescri
 	}
 
 	@Override
-	public void terminateCluster(ApplicationId clusterId) {
+	public void killCluster(ApplicationId clusterId) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index aec5fdb..1948d0f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -412,9 +412,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	@Override
-	public void terminateCluster(ApplicationId applicationId) throws FlinkException {
+	public void killCluster(ApplicationId applicationId) throws FlinkException {
 		try {
 			yarnClient.killApplication(applicationId);
+			Utils.deleteApplicationFiles(Collections.singletonMap(
+				YarnConfigKeys.FLINK_YARN_FILES,
+				getYarnFilesDir(applicationId).toUri().toString()));
 		} catch (YarnException | IOException e) {
 			throw new FlinkException("Could not kill the Yarn Flink cluster with id " + applicationId + '.', e);
 		}
@@ -897,8 +900,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		Path yarnFilesDir = new Path(homeDir, ".flink/" + appId + '/');
-
+		final Path yarnFilesDir = getYarnFilesDir(appId);
 		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
 		fs.setPermission(yarnFilesDir, permission); // set permission for path.
 
@@ -1086,6 +1088,17 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	}
 
 	/**
+	 * Returns the Path where the YARN application files should be uploaded to.
+	 *
+	 * @param appId YARN application id
+	 */
+	private Path getYarnFilesDir(final ApplicationId appId) throws IOException {
+		final FileSystem fileSystem = FileSystem.get(yarnConfiguration);
+		final Path homeDir = fileSystem.getHomeDirectory();
+		return new Path(homeDir, ".flink/" + appId + '/');
+	}
+
+	/**
 	 * Uploads and registers a single resource and adds it to <tt>localResources</tt>.
 	 *
 	 * @param key

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index fb9a478..20e02e1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -173,6 +174,29 @@ public final class Utils {
 	}
 
 	/**
+	 * Deletes the YARN application files, e.g., Flink binaries, libraries, etc., from the remote
+	 * filesystem.
+	 *
+	 * @param env The environment variables.
+	 */
+	public static void deleteApplicationFiles(final Map<String, String> env) {
+		final String applicationFilesDir = env.get(YarnConfigKeys.FLINK_YARN_FILES);
+		if (!StringUtils.isNullOrWhitespaceOnly(applicationFilesDir)) {
+			final org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(applicationFilesDir);
+			try {
+				final org.apache.flink.core.fs.FileSystem fileSystem = path.getFileSystem();
+				if (!fileSystem.delete(path, true)) {
+					LOG.error("Deleting yarn application files under {} was unsuccessful.", applicationFilesDir);
+				}
+			} catch (final IOException e) {
+				LOG.error("Could not properly delete yarn application files directory {}.", applicationFilesDir, e);
+			}
+		} else {
+			LOG.debug("No yarn application files directory set. Therefore, cannot clean up the data.");
+		}
+	}
+
+	/**
 	 * Creates a YARN resource for the remote object at the given location.
 	 *
 	 * @param remoteRsrcPath	remote location of the resource

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 3edd6e4..0d7546e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
@@ -263,6 +264,20 @@ public class YarnClusterClient extends ClusterClient<ApplicationId> {
 		}
 	}
 
+	@Override
+	public void shutDownCluster() {
+		LOG.info("Sending shutdown request to the Application Master");
+		try {
+			final Future<Object> response = Patterns.ask(applicationClient.get(),
+				new YarnMessages.LocalStopYarnSession(ApplicationStatus.SUCCEEDED,
+					"Flink YARN Client requested shutdown"),
+				new Timeout(akkaDuration));
+			Await.ready(response, akkaDuration);
+		} catch (final Exception e) {
+			LOG.warn("Error while stopping YARN cluster.", e);
+		}
+	}
+
 	public ApplicationId getApplicationId() {
 		return appId;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 220f4f2..22d0164 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -279,6 +279,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		} catch (Throwable t) {
 			log.error("Could not unregister the application master.", t);
 		}
+
+		Utils.deleteApplicationFiles(env);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 7596d68..698d48d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -611,7 +611,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 						}
 
 						try {
-							yarnClusterDescriptor.terminateCluster(yarnApplicationId);
+							yarnClusterDescriptor.killCluster(yarnApplicationId);
 						} catch (FlinkException fe) {
 							LOG.info("Could not properly terminate the Flink cluster.", fe);
 						}
@@ -644,18 +644,14 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 							LOG.info("Could not properly close the Yarn application status monitor.", e);
 						}
 
+						clusterClient.shutDownCluster();
+
 						try {
 							clusterClient.shutdown();
 						} catch (Exception e) {
 							LOG.info("Could not properly shutdown cluster client.", e);
 						}
 
-						try {
-							yarnClusterDescriptor.terminateCluster(yarnApplicationId);
-						} catch (FlinkException e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
-						}
-
 						// shut down the scheduled executor service
 						ExecutorUtils.gracefulShutdown(
 							1000L,

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 6b439bd..7d17325 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -109,8 +109,6 @@ class YarnJobManager(
 
   private def handleYarnShutdown: Receive = {
     case msg: StopCluster =>
-      super.handleMessage(msg)
-
       // do global cleanup if the yarn files path has been set
       yarnFilesPath match {
         case Some(filePath) =>
@@ -135,5 +133,7 @@ class YarnJobManager(
           log.debug("No yarn application files directory set. Therefore, cannot clean up " +
             "the data.")
       }
+
+      super.handleMessage(msg)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
new file mode 100644
index 0000000..5fc3567
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link Utils}.
+ */
+public class UtilsTest {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Test
+	public void testDeleteApplicationFiles() throws Exception {
+		final Path applicationFilesDir = temporaryFolder.newFolder(".flink").toPath();
+		Files.createFile(applicationFilesDir.resolve("flink.jar"));
+		assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(1L));
+		assertThat(Files.list(applicationFilesDir).count(), equalTo(1L));
+
+		Utils.deleteApplicationFiles(Collections.singletonMap(
+			YarnConfigKeys.FLINK_YARN_FILES,
+			applicationFilesDir.toString()));
+		assertThat(Files.list(temporaryFolder.getRoot().toPath()).count(), equalTo(0L));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d130f878/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 0d37b8e..03f3ef2 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -76,6 +77,7 @@ import org.junit.rules.TemporaryFolder;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.nio.file.Files;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -90,7 +92,9 @@ import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
 import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
 import static org.apache.flink.yarn.YarnConfigKeys.FLINK_JAR_PATH;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -105,9 +109,9 @@ public class YarnResourceManagerTest extends TestLogger {
 
 	private static final Time TIMEOUT = Time.seconds(10L);
 
-	private static Configuration flinkConfig = new Configuration();
+	private Configuration flinkConfig = new Configuration();
 
-	private static Map<String, String> env = new HashMap<>();
+	private Map<String, String> env = new HashMap<>();
 
 	@Rule
 	public TemporaryFolder folder = new TemporaryFolder();
@@ -200,7 +204,7 @@ public class YarnResourceManagerTest extends TestLogger {
 		}
 	}
 
-	static class Context {
+	class Context {
 
 		// services
 		final TestingRpcService rpcService;
@@ -388,4 +392,20 @@ public class YarnResourceManagerTest extends TestLogger {
 			assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
 		}};
 	}
+
+	/**
+	 * Tests that application files are deleted when the YARN application master is de-registered.
+	 */
+	@Test
+	public void testDeleteApplicationFiles() throws Exception {
+		new Context() {{
+			final File applicationDir = folder.newFolder(".flink");
+			env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath());
+
+			startResourceManager();
+
+			resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+			assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath()));
+		}};
+	}
 }