You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:42:01 UTC

[2/8] flink git commit: [FLINK-6646] [yarn] Let YarnJobManager delete Yarn application files

[FLINK-6646] [yarn] Let YarnJobManager delete Yarn application files

Before the YarnClusterClient decided when to delete the Yarn application files.
This is problematic because the client does not know whether a Yarn application
is being restarted or terminated. Due to this the files where always deleted. This
prevents Yarn from restarting a failed ApplicationMaster, effectively thwarting
Flink's HA capabilities.

The PR changes the behaviour such that the YarnJobManager deletes the Yarn files
if it receives a StopCluster message. That way, we can be sure that the yarn files
are deleted only iff the cluster is intended to be shut down.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99e15dd8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99e15dd8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99e15dd8

Branch: refs/heads/release-1.3
Commit: 99e15dd85aa5907dbe6dad6f27179c568179f1a6
Parents: e045423
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 24 17:59:51 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:32:48 2017 +0800

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  2 +-
 ...CliFrontendYarnAddressConfigurationTest.java |  2 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 42 ++++++++------------
 .../apache/flink/yarn/YarnClusterClient.java    | 26 +-----------
 .../org/apache/flink/yarn/YarnConfigKeys.java   |  1 +
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  4 +-
 .../apache/flink/yarn/ApplicationClient.scala   |  2 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  | 35 ++++++++++++++++
 9 files changed, 60 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1ea783b..86d9894 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1201,7 +1201,7 @@ class JobManager(
           }
         case None =>
           // ResourceManager not available
-          // we choose not to wait here beacuse it might block the shutdown forever
+          // we choose not to wait here because it might block the shutdown forever
       }
 
       sender() ! decorateMessage(StopClusterSuccessful.getInstance())

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 6a8c266..2399f47 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.commons.cli.CommandLine;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -390,7 +391,6 @@ public class CliFrontendYarnAddressConfigurationTest {
 						YarnClient yarnClient,
 						ApplicationReport report,
 						Configuration flinkConfiguration,
-						Path sessionFilesDir,
 						boolean perJobCluster) throws IOException, YarnException {
 
 					return Mockito.mock(YarnClusterClient.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 4da5a39..9351682 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -179,7 +179,7 @@ public class FlinkYarnSessionCliTest {
 				Mockito.mock(YarnClient.class),
 				Mockito.mock(ApplicationReport.class),
 				config,
-				new Path("/temp"), false);
+				false);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 3110a5b..818a3e8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -107,12 +108,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	private Configuration conf = new YarnConfiguration();
 
 	/**
-	 * Files (usually in a distributed file system) used for the YARN session of Flink.
-	 * Contains configuration files and jar files.
-	 */
-	private Path sessionFilesDir;
-
-	/**
 	 * If the user has specified a different number of slots, we store them here
 	 */
 	private int slots = -1;
@@ -416,7 +411,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, appReport.getHost());
 			flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, appReport.getRpcPort());
 
-			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, sessionFilesDir, false);
+			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
 		} catch (Exception e) {
 			throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
 		}
@@ -583,7 +578,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
 
 		// the Flink cluster is deployed in YARN. Represent cluster
-		return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
+		return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, true);
 	}
 
 	public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication) throws Exception {
@@ -739,10 +734,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 		}
 
-		sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+		Path yarnFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId + '/');
 
 		FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-		fs.setPermission(sessionFilesDir, permission); // set permission for path.
+		fs.setPermission(yarnFilesDir, permission); // set permission for path.
 
 		//To support Yarn Secure Integration Test Scenario
 		//In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
@@ -812,6 +807,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
 		appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
 		appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
+		appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, yarnFilesDir.toUri().toString());
 
 		// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
 		appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
@@ -863,7 +859,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		setApplicationTags(appContext);
 
 		// add a hook to clean up in case deployment fails
-		Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication);
+		Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
 		Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
 		LOG.info("Submitting application master " + appId);
 		yarnClient.submitApplication(appContext);
@@ -1057,10 +1053,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 	}
 
-	public String getSessionFilesDir() {
-		return sessionFilesDir.toString();
-	}
-
 	public void setName(String name) {
 		if(name == null) {
 			throw new IllegalArgumentException("The passed name is null");
@@ -1226,22 +1218,24 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 	private class DeploymentFailureHook extends Thread {
 
-		DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnApplication) {
-			this.yarnClient = yarnClient;
-			this.yarnApplication = yarnApplication;
-		}
+		private final YarnClient yarnClient;
+		private final YarnClientApplication yarnApplication;
+		private final Path yarnFilesDir;
 
-		private YarnClient yarnClient;
-		private YarnClientApplication yarnApplication;
+		DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnApplication, Path yarnFilesDir) {
+			this.yarnClient = Preconditions.checkNotNull(yarnClient);
+			this.yarnApplication = Preconditions.checkNotNull(yarnApplication);
+			this.yarnFilesDir = Preconditions.checkNotNull(yarnFilesDir);
+		}
 
 		@Override
 		public void run() {
 			LOG.info("Cancelling deployment from Deployment Failure Hook");
 			failSessionDuringDeployment(yarnClient, yarnApplication);
-			LOG.info("Deleting files in " + sessionFilesDir);
+			LOG.info("Deleting files in {}.", yarnFilesDir);
 			try {
 				FileSystem fs = FileSystem.get(conf);
-				fs.delete(sessionFilesDir, true);
+				fs.delete(yarnFilesDir, true);
 				fs.close();
 			} catch (IOException e) {
 				LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
@@ -1348,14 +1342,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			YarnClient yarnClient,
 			ApplicationReport report,
 			org.apache.flink.configuration.Configuration flinkConfiguration,
-			Path sessionFilesDir,
 			boolean perJobCluster) throws Exception {
 		return new YarnClusterClient(
 			descriptor,
 			yarnClient,
 			report,
 			flinkConfiguration,
-			sessionFilesDir,
 			perJobCluster);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 8f47b18..7042f99 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
@@ -38,9 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -74,9 +72,6 @@ public class YarnClusterClient extends ClusterClient {
 
 	private Thread clientShutdownHook = new ClientShutdownHook();
 	private PollingThread pollingRunner;
-	private final Configuration hadoopConfig;
-	// (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown.
-	private final Path sessionFilesDir;
 
 	//---------- Class internal fields -------------------
 
@@ -99,7 +94,6 @@ public class YarnClusterClient extends ClusterClient {
 	 * @param yarnClient Client to talk to YARN
 	 * @param appReport the YARN application ID
 	 * @param flinkConfig Flink configuration
-	 * @param sessionFilesDir Location of files required for YARN session
 	 * @param newlyCreatedCluster Indicator whether this cluster has just been created
 	 * @throws IOException
 	 * @throws YarnException
@@ -108,8 +102,7 @@ public class YarnClusterClient extends ClusterClient {
 		final AbstractYarnClusterDescriptor clusterDescriptor,
 		final YarnClient yarnClient,
 		final ApplicationReport appReport,
-		org.apache.flink.configuration.Configuration flinkConfig,
-		Path sessionFilesDir,
+		Configuration flinkConfig,
 		boolean newlyCreatedCluster) throws Exception {
 
 		super(flinkConfig);
@@ -117,8 +110,6 @@ public class YarnClusterClient extends ClusterClient {
 		this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
 		this.clusterDescriptor = clusterDescriptor;
 		this.yarnClient = yarnClient;
-		this.hadoopConfig = yarnClient.getConfig();
-		this.sessionFilesDir = sessionFilesDir;
 		this.appReport = appReport;
 		this.appId = appReport.getApplicationId();
 		this.trackingURL = appReport.getTrackingUrl();
@@ -391,19 +382,6 @@ public class YarnClusterClient extends ClusterClient {
 			LOG.warn("Exception while deleting the JobManager address file", e);
 		}
 
-		if (sessionFilesDir != null) {
-			LOG.info("Deleting files in " + sessionFilesDir);
-			try {
-				FileSystem shutFS = FileSystem.get(hadoopConfig);
-				shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
-				shutFS.close();
-			} catch (IOException e) {
-				LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e);
-			}
-		} else {
-			LOG.warn("Session file directory not set. Not deleting session files");
-		}
-
 		try {
 			pollingRunner.stopRunner();
 			pollingRunner.join(1000);

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index ada241c..7c9c7a7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -39,6 +39,7 @@ public class YarnConfigKeys {
 	public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
 
 	public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+	public final static String FLINK_YARN_FILES = "_FLINK_YARN_FILES"; // the root directory for all yarn application files
 
 	public final static String KEYTAB_PATH = "_KEYTAB_PATH";
 	public final static String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 69b472a..1ece264 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -660,9 +660,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 				// print info and quit:
 				LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
 						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
-						"yarn application -kill " + yarnCluster.getApplicationId() + System.lineSeparator() +
-						"Please also note that the temporary files of the YARN session in {} will not be removed.",
-						yarnDescriptor.getSessionFilesDir());
+						"yarn application -kill " + yarnCluster.getApplicationId());
 				yarnCluster.waitForClusterToBeReady();
 				yarnCluster.disconnect();
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 7442503..35d5f56 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -131,7 +131,7 @@ class ApplicationClient(
       }
 
     case msg: RegisterInfoMessageListenerSuccessful =>
-      // The job manager acts as a proxy between the client and the resource managert
+      // The job manager acts as a proxy between the client and the resource manager
       val jm = sender()
       log.info(s"Successfully registered at the ResourceManager using JobManager $jm")
       yarnJobManager = Some(jm)

http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index efb4801..902553f 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,12 +18,15 @@
 
 package org.apache.flink.yarn
 
+import java.io.IOException
 import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
+import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
+import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.instance.InstanceManager
@@ -89,5 +92,37 @@ class YarnJobManager(
       flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
       TimeUnit.SECONDS)
 
+  val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))
+
   override val jobPollingInterval = YARN_HEARTBEAT_DELAY
+
+  override def handleMessage: Receive = {
+    handleYarnShutdown orElse super.handleMessage
+  }
+
+  def handleYarnShutdown: Receive = {
+    case msg:StopCluster =>
+      super.handleMessage(msg)
+
+      // do global cleanup if the yarn files path has been set
+      yarnFilesPath match {
+        case Some(filePath) =>
+          log.info(s"Deleting yarn application files under $filePath.")
+
+          val path = new Path(filePath)
+
+          try {
+            val fs = path.getFileSystem
+            fs.delete(path, true)
+          } catch {
+            case ioe: IOException =>
+              log.warn(
+                s"Could not properly delete yarn application files directory $filePath.",
+                ioe)
+          }
+        case None =>
+          log.debug("No yarn application files directory set. Therefore, cannot clean up " +
+            "the data.")
+      }
+  }
 }