You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:42:01 UTC
[2/8] flink git commit: [FLINK-6646] [yarn] Let YarnJobManager delete
Yarn application files
[FLINK-6646] [yarn] Let YarnJobManager delete Yarn application files
Before the YarnClusterClient decided when to delete the Yarn application files.
This is problematic because the client does not know whether a Yarn application
is being restarted or terminated. Due to this the files where always deleted. This
prevents Yarn from restarting a failed ApplicationMaster, effectively thwarting
Flink's HA capabilities.
The PR changes the behaviour such that the YarnJobManager deletes the Yarn files
if it receives a StopCluster message. That way, we can be sure that the yarn files
are deleted only iff the cluster is intended to be shut down.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99e15dd8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99e15dd8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99e15dd8
Branch: refs/heads/release-1.3
Commit: 99e15dd85aa5907dbe6dad6f27179c568179f1a6
Parents: e045423
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 24 17:59:51 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:32:48 2017 +0800
----------------------------------------------------------------------
.../flink/runtime/jobmanager/JobManager.scala | 2 +-
...CliFrontendYarnAddressConfigurationTest.java | 2 +-
.../flink/yarn/FlinkYarnSessionCliTest.java | 2 +-
.../yarn/AbstractYarnClusterDescriptor.java | 42 ++++++++------------
.../apache/flink/yarn/YarnClusterClient.java | 26 +-----------
.../org/apache/flink/yarn/YarnConfigKeys.java | 1 +
.../flink/yarn/cli/FlinkYarnSessionCli.java | 4 +-
.../apache/flink/yarn/ApplicationClient.scala | 2 +-
.../org/apache/flink/yarn/YarnJobManager.scala | 35 ++++++++++++++++
9 files changed, 60 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1ea783b..86d9894 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1201,7 +1201,7 @@ class JobManager(
}
case None =>
// ResourceManager not available
- // we choose not to wait here beacuse it might block the shutdown forever
+ // we choose not to wait here because it might block the shutdown forever
}
sender() ! decorateMessage(StopClusterSuccessful.getInstance())
http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 6a8c266..2399f47 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.commons.cli.CommandLine;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -390,7 +391,6 @@ public class CliFrontendYarnAddressConfigurationTest {
YarnClient yarnClient,
ApplicationReport report,
Configuration flinkConfiguration,
- Path sessionFilesDir,
boolean perJobCluster) throws IOException, YarnException {
return Mockito.mock(YarnClusterClient.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 4da5a39..9351682 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -179,7 +179,7 @@ public class FlinkYarnSessionCliTest {
Mockito.mock(YarnClient.class),
Mockito.mock(ApplicationReport.class),
config,
- new Path("/temp"), false);
+ false);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 3110a5b..818a3e8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -107,12 +108,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private Configuration conf = new YarnConfiguration();
/**
- * Files (usually in a distributed file system) used for the YARN session of Flink.
- * Contains configuration files and jar files.
- */
- private Path sessionFilesDir;
-
- /**
* If the user has specified a different number of slots, we store them here
*/
private int slots = -1;
@@ -416,7 +411,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfiguration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, appReport.getHost());
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, appReport.getRpcPort());
- return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, sessionFilesDir, false);
+ return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
} catch (Exception e) {
throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
}
@@ -583,7 +578,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
// the Flink cluster is deployed in YARN. Represent cluster
- return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, sessionFilesDir, true);
+ return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, true);
}
public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication) throws Exception {
@@ -739,10 +734,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
- sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
+ Path yarnFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId + '/');
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
- fs.setPermission(sessionFilesDir, permission); // set permission for path.
+ fs.setPermission(yarnFilesDir, permission); // set permission for path.
//To support Yarn Secure Integration Test Scenario
//In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
@@ -812,6 +807,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
+ appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, yarnFilesDir.toUri().toString());
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
@@ -863,7 +859,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
setApplicationTags(appContext);
// add a hook to clean up in case deployment fails
- Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication);
+ Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
@@ -1057,10 +1053,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
- public String getSessionFilesDir() {
- return sessionFilesDir.toString();
- }
-
public void setName(String name) {
if(name == null) {
throw new IllegalArgumentException("The passed name is null");
@@ -1226,22 +1218,24 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private class DeploymentFailureHook extends Thread {
- DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnApplication) {
- this.yarnClient = yarnClient;
- this.yarnApplication = yarnApplication;
- }
+ private final YarnClient yarnClient;
+ private final YarnClientApplication yarnApplication;
+ private final Path yarnFilesDir;
- private YarnClient yarnClient;
- private YarnClientApplication yarnApplication;
+ DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnApplication, Path yarnFilesDir) {
+ this.yarnClient = Preconditions.checkNotNull(yarnClient);
+ this.yarnApplication = Preconditions.checkNotNull(yarnApplication);
+ this.yarnFilesDir = Preconditions.checkNotNull(yarnFilesDir);
+ }
@Override
public void run() {
LOG.info("Cancelling deployment from Deployment Failure Hook");
failSessionDuringDeployment(yarnClient, yarnApplication);
- LOG.info("Deleting files in " + sessionFilesDir);
+ LOG.info("Deleting files in {}.", yarnFilesDir);
try {
FileSystem fs = FileSystem.get(conf);
- fs.delete(sessionFilesDir, true);
+ fs.delete(yarnFilesDir, true);
fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
@@ -1348,14 +1342,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
YarnClient yarnClient,
ApplicationReport report,
org.apache.flink.configuration.Configuration flinkConfiguration,
- Path sessionFilesDir,
boolean perJobCluster) throws Exception {
return new YarnClusterClient(
descriptor,
yarnClient,
report,
flinkConfiguration,
- sessionFilesDir,
perJobCluster);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 8f47b18..7042f99 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
@@ -38,9 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -74,9 +72,6 @@ public class YarnClusterClient extends ClusterClient {
private Thread clientShutdownHook = new ClientShutdownHook();
private PollingThread pollingRunner;
- private final Configuration hadoopConfig;
- // (HDFS) location of the files required to run on YARN. Needed here to delete them on shutdown.
- private final Path sessionFilesDir;
//---------- Class internal fields -------------------
@@ -99,7 +94,6 @@ public class YarnClusterClient extends ClusterClient {
* @param yarnClient Client to talk to YARN
* @param appReport the YARN application ID
* @param flinkConfig Flink configuration
- * @param sessionFilesDir Location of files required for YARN session
* @param newlyCreatedCluster Indicator whether this cluster has just been created
* @throws IOException
* @throws YarnException
@@ -108,8 +102,7 @@ public class YarnClusterClient extends ClusterClient {
final AbstractYarnClusterDescriptor clusterDescriptor,
final YarnClient yarnClient,
final ApplicationReport appReport,
- org.apache.flink.configuration.Configuration flinkConfig,
- Path sessionFilesDir,
+ Configuration flinkConfig,
boolean newlyCreatedCluster) throws Exception {
super(flinkConfig);
@@ -117,8 +110,6 @@ public class YarnClusterClient extends ClusterClient {
this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
this.clusterDescriptor = clusterDescriptor;
this.yarnClient = yarnClient;
- this.hadoopConfig = yarnClient.getConfig();
- this.sessionFilesDir = sessionFilesDir;
this.appReport = appReport;
this.appId = appReport.getApplicationId();
this.trackingURL = appReport.getTrackingUrl();
@@ -391,19 +382,6 @@ public class YarnClusterClient extends ClusterClient {
LOG.warn("Exception while deleting the JobManager address file", e);
}
- if (sessionFilesDir != null) {
- LOG.info("Deleting files in " + sessionFilesDir);
- try {
- FileSystem shutFS = FileSystem.get(hadoopConfig);
- shutFS.delete(sessionFilesDir, true); // delete conf and jar file.
- shutFS.close();
- } catch (IOException e) {
- LOG.error("Could not delete the Flink jar and configuration files in HDFS..", e);
- }
- } else {
- LOG.warn("Session file directory not set. Not deleting session files");
- }
-
try {
pollingRunner.stopRunner();
pollingRunner.join(1000);
http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
index ada241c..7c9c7a7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java
@@ -39,6 +39,7 @@ public class YarnConfigKeys {
public static final String ENV_FLINK_CLASSPATH = "_FLINK_CLASSPATH";
public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
+ public final static String FLINK_YARN_FILES = "_FLINK_YARN_FILES"; // the root directory for all yarn application files
public final static String KEYTAB_PATH = "_KEYTAB_PATH";
public final static String KEYTAB_PRINCIPAL = "_KEYTAB_PRINCIPAL";
http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 69b472a..1ece264 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -660,9 +660,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
// print info and quit:
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
- "yarn application -kill " + yarnCluster.getApplicationId() + System.lineSeparator() +
- "Please also note that the temporary files of the YARN session in {} will not be removed.",
- yarnDescriptor.getSessionFilesDir());
+ "yarn application -kill " + yarnCluster.getApplicationId());
yarnCluster.waitForClusterToBeReady();
yarnCluster.disconnect();
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 7442503..35d5f56 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -131,7 +131,7 @@ class ApplicationClient(
}
case msg: RegisterInfoMessageListenerSuccessful =>
- // The job manager acts as a proxy between the client and the resource managert
+ // The job manager acts as a proxy between the client and the resource manager
val jm = sender()
log.info(s"Successfully registered at the ResourceManager using JobManager $jm")
yarnJobManager = Some(jm)
http://git-wip-us.apache.org/repos/asf/flink/blob/99e15dd8/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index efb4801..902553f 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -18,12 +18,15 @@
package org.apache.flink.yarn
+import java.io.IOException
import java.util.concurrent.{Executor, ScheduledExecutorService, TimeUnit}
import akka.actor.ActorRef
import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration}
+import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.clusterframework.ContaineredJobManager
+import org.apache.flink.runtime.clusterframework.messages.StopCluster
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
import org.apache.flink.runtime.instance.InstanceManager
@@ -89,5 +92,37 @@ class YarnJobManager(
flinkConfiguration.getInteger(ConfigConstants.YARN_HEARTBEAT_DELAY_SECONDS, 5),
TimeUnit.SECONDS)
+ val yarnFilesPath: Option[String] = Option(System.getenv().get(YarnConfigKeys.FLINK_YARN_FILES))
+
override val jobPollingInterval = YARN_HEARTBEAT_DELAY
+
+ override def handleMessage: Receive = {
+ handleYarnShutdown orElse super.handleMessage
+ }
+
+ def handleYarnShutdown: Receive = {
+ case msg:StopCluster =>
+ super.handleMessage(msg)
+
+ // do global cleanup if the yarn files path has been set
+ yarnFilesPath match {
+ case Some(filePath) =>
+ log.info(s"Deleting yarn application files under $filePath.")
+
+ val path = new Path(filePath)
+
+ try {
+ val fs = path.getFileSystem
+ fs.delete(path, true)
+ } catch {
+ case ioe: IOException =>
+ log.warn(
+ s"Could not properly delete yarn application files directory $filePath.",
+ ioe)
+ }
+ case None =>
+ log.debug("No yarn application files directory set. Therefore, cannot clean up " +
+ "the data.")
+ }
+ }
}